본문으로 바로가기

Kafka 학습하기_컨슈머

category MQ 2024. 6. 5. 08:00
반응형


[인프런 - 아파치 카프카 애플리케이션 프로그래밍]

 

컨슈머: 카프카 클러스터에 저장된 레코드를 받아와서 처리하는 애플리케이션

Fetcher: 리더 파티션으로부터 레코드들을 미리 가져와서 대기.
poll(): Fetcher에 있는 레코드들을 리턴하는 레코드

ConsumerRecords: 처리하고자 하는 레코드들의 모음. 오프셋이 포함되어있음.

컨슈머 그룹

동일한 역할을 하는 컨슈머들의 묶음.

각 컨슈머 그룹으로부터 격리된 환경에서 안전하게 운영할 수 있도록 도와주는 카프카의 독특한 방식이다.

컨슈머 그룹으로 묶인 컨슈머가 토픽을 구독해서 데이터를 가져갈때, 1개의 파티션은 최대 1개의 컨슈머에 할당이 가능하다.

그리고 1개 컨슈머는 여러개의 파티션에 할당될 수 있다. 이러한 특징으로 컨슈머 그룹의 컨슈머 개수는 가져가고자 하는 토픽의 파티션 개수보다 같거나 작아야한다.

컨슈머 그룹의 컨슈머가 파티션 개수보다 많을 경우


파티션을 할당받지 못한 컨슈머는 스레드만 차지하고 실질적인 데이터 처리를 하지 못하므로 애플리케이션 실행에 있어 불필요한 스레드로 남는다.

리밸런싱


컨슈머 그룹으로 이루어진 컨슈머 들 중 일부 컨슈머에 장애가 발생하면, 장애가 발생한 컨슈머에 할당된 파티션은 장애가 발생하지 않은 컨슈머에 소유권이 넘어간다.
이러한 과정을 리밸런싱이라고 부른다.

커밋
컨슈머는 카프카 브로커부터 데이터를 어디까지 가져갔는지 커밋을 통해 기록한다.
특정 토픽의 파티션을 어떤 컨슈머 그룹이 몇 번째 가져갔는지 카프카 브로커 내부에서 사용되는 내부 토픽에  _consumer_offsets에 기록된다.

Assignor
컨슈머와 파티션 할당 정책은 컨슈머의 Assignor에 의해 결정된다. 카프카에서는 RangeAssginor, RoundRobinAssignor, StickyAssignor를 제공한다.
카프카 2.5.0는 RangeAssginor가 기본값으로 설정된다.
RangeAssginor: 각 토픽에서 파티션을 숫자로 정렬, 컨슈머를 사전 순서로 정렬하여 할당.
RoundRobinAssignor: 모든 파티션을 컨슈머에서 번갈아가면서 할당.
StickyAssignor: 최대한 파티션을 균등하게 배분하면서 할당.

컨슈머 주요 옵션(필수)
bootstrap.servers 
- 프로듀서가 데이터를 전송할 대상 카프카 클러스터에 속한 브로커의 호스트 이름:포트를 1개 이상 작성
- 2개 이상 브로커 정보를 입력해 일부 브로커에 이슈가 발생하더라도 접속하는 데 이슈가 없도록 설정 가능
key.serializer
- 레코드의 메시지 키를 역직렬화하는 클래스 지정
value.serializer
- 레코드의 메시지 값을 역직렬화하는 클래스 지정

컨슈머 주요 옵션(선택)
group.id
- 컨슈머 그룹 아이디를 지정한다.
- subscribe 메서드로 토픽을 구독하여 사용할 때는 이 옵션이 필수값이다. 기본값은 null
auto.offset.reset
- 컨슈머 그룹이 특정 파티션을 읽을 때 저장된 컨슈머 오프셋이 없는 경우 어느 오프셋부터 읽을지 선택하는 옵션이다.
- 이미 컨슈머 오프셋이 있다면 이 옵션값은 무시된다. 기본값은 latest
    - latest: 설정하면 가장 높은 오프셋부터 읽기 시작한다. (최근에 넣은 레코드)
    - earliest: 설정하면 가장 낮은 오프셋부터 읽기 시작한다. (가장 오래전에 넣은)
    - none: 설정하면 컨슈머 그룹이 커밋한 기록이 있는지 찾아본다. 만약 커밋 기록이 없으면 오류를 반환하고 커밋 기록이 있다면 기존 커밋 기록 이후 오프셋부터 읽기 시작한다. 기본값은 latest
enable.auto.commit
- 자동 커밋으로 할지 수동 커밋으로 할지 선택한다. 기본값은 true
auto.commit.interval.ms
- 자동 커밋일 경우 오프셋 커밋 간 격을 지정한다. 기본값은 5초
max.poll.records
- poll 메서드를 통해 반환되는 레코드 개수를 지정한다. 기본값은 500
session.timeout.ms
- 컨슈머가 브로커와 연결이 끊기는 최대 시간. 기본값은 10초
heartbeat.interval.ms
- 하트비트 전송하는 시간 간격. 기본값은 3초
max.poll.interval.ms
- poll 메서드를 호출하는 간격의 최대 시간. 기본값은 5분
isolation.level
- 트랜잭션 프로듀서가 레코드를 트랜잭션 단위로 보낼 경우 사용한다

반응형

'MQ' 카테고리의 다른 글

Kafka Pub 동적 Batch 관리  (0) 2024.08.18
Kafka Pub 호출 방안  (0) 2024.08.18
Kafka 학습하기_프로듀서  (0) 2024.06.04
Kafka 학습하기_토픽, 파티션, 레코드  (0) 2024.06.02
Kafka 학습하기_브로커와 클러스터  (0) 2024.06.01