Kafka Consumer
카프카 컨슈머는 브로커에 있는 Topic을 구독하여 읽는 역할을 수행한다.
모든 Consumer는 고유한 group.id를 가지는 Consumer Group에 소속되어야 한다. (자바는 실행이 안된다.)
개별 Consumer들은 하나의 파티션마다 1:1 매칭되도록 보통 사용한다.
1) subscribe()
읽고자 하는 토픽을 구독한다.
2) poll()
토픽에서 메시지를 Polling한다.
KafkaConsumer.poll(Duration.ofMillis(10000)) 인 경우,
브로커 혹은 Consumer의 내부 Queue에 데이터가 있으면 바로 데이터를 반환하고
없으면 최대 10초간 데이터 Fetch를 브로커에 계속 수행하고 결과를 반환한다.
3) commit
메세지를 읽고나면 commit을 통해 __consumer_offset에 다음 읽을 offset 위치를 작성한다.
이 __consumer_offset은 Consumer가 아닌 Broker에 존재한다.
이를 통해 장애 시 어디까지 읽었는지 확인하고 가져올 수 있게 된다.
Consumer의 기본 객체
Consumer는 Fetcher, ConsumerClientNetwork 등 주요 내부 객체와 별도 쓰레드인 Heart Beat Thread를 생성한다.
1) Fetcher
Consumer 내부의 Linked Queue에 데이터가 없으면, ConsumerNetworkClient로부터 브로커에 구독중인 토픽에새 데이터가 있는지 가져오도록 한다.
fetch.min.bytes = 1(Default)
Fetcher가 Record를 읽어들일 때의 최소 Bytes. 브로커는 이 이상 새로운 메시지가 쌓일 때 까지 전송하지 않는다.
fetch.max.wait.ms = 500(ms)
브로커에 fetch.min.bytes 이상의 메시지가 쌓일 때까지 최대 대기 시간.
fetch.max.bytes = 52428800
Fetcher가 한 번에 가져올 수 있는 최대 데이터 bytes. 기본값 (50MB)
max.partition.fetch.bytes = 1024168
Fetcher가 파티션별 한 번에 최대로 가져올 수 있는 Bytes. (위 설정값에 따라 제약이 있음.)
max.poll.records = 500
Fetcher가 한 번에 가져올 수 있는 레코드 수. 기본 (500)
2) ConsumerNetworkClient브로커의 구독중인 토픽 내부의 데이터를 가져와서 Consumer 내부의 Linked Queue에 넣어준다.
이렇게 Fetcher와 ConsumerNetworkClient가 Poll을 수행한다.
3) HeartBeat Thread
HeartBeat Thread는 Broker의 Group Coordinator에게 자신이 살아있음을 알린다.
만약 HeartBeat이 브로커에 일정시간동안 도달하지 못하면,
Coordinator는 해당 Consumer를 그룹에서 Kick-Out 하며 Consumer들이 Rebalancing되도록 명령한다.
Consumer의 Rebalancing(리밸런싱)
1) Consumer Group 내에 새로운 Consumer가 추가될 때
2) 기존 Consumer가 종료될 때 (컨슈머로부터 Heart Beat가 오지 않아 Kick-out 됨)
3) Topic에 새로운 Patrition이 추가될 때
Broker의 Group Coordinator가 Consumer들에게 파티션을 재 할당하는 Rebalancing을 수행하도록 Leader Consumer에게 명령한다.
Rebalancing 상세 작업
1) Consumer Group 내의 Consumer가 브로커에 최초 접속 요청 시 Group Coordinator 생성
2) 동일한 group.id로 여러 개의 Consumer가 Broker의 Group Coordinator로 접속하고, 가장 빨리 Group에 Join 요청한
Consumer가 Group의 Leader Consumer가 됨.
3) Leader로 지정된 Consumer는 파티션 할당 전략에 따라 그룹 내 타 Consumer에게 파티션 할당
4) Leader Consumer는 최종 할당된 파티션 정보를 Broker의 Group Coordinator에게 전달
5) 정보 전달 성공을 공유한 후 개별 Consumer들은 할당된 파티션에서 Polling 수행.
리밸런싱이 일어날 때에는 컨슈머의 작업이 멈추게 된다.
Rebalancing Protocol
1) Eager Mode : Kafka Consumer의 Default 모드
Rebalance 수행 시 기존 Consumer에게 할당되어있던 모든 파티션 할당을 취소하고 잠시 메시지를 읽지 못함.
이후 Consumer에 파티션을 새로 할당받고 다시 메시지를 읽는 모드.
모든 컨슈머가 이전과 이후 같은 파티션을 할당 받았을지라도 재할당 하기 때문에 모든 컨슈머가 잠시 메시지를 읽지 않는다.
또한, Lag 가 증가할 수 있는 문제가 있다.
2) Incremental Cooperative
Rebalance 수행 시 기존 Consumer들의 모든 파티션 할당을 취소하지 않고, 대상이 되는 Consumer 들에 대해서
파티션에 따라 점진적으로 Consumer를 할당하며 Rebalance를 수행한다.
따라서 전체가 메시지를 읽는 것을 멈추는 일이 없고, 개별 Consumer가 협력적으로 영향을 받는 파티션만 Rebalance로 재분배한다.
많은 Consumer를 가지는 Consumer Group이면서, Rebalancing 시간이 오래 걸릴 시 활용도가 높다.
즉, 한 Consumer가 Kick-out 되면 그 파티션만 다른 컨슈머에 재할당하는 방식이다.
다만, 이 방법은
Consumer Group Status
GroupMetaData에 존재하는 데이터로, 3가지가 있다.
1) Empty : Consumer Group만 있는 상태
2) Rebalance : Consumer의 Join요청에 따른 리밸런싱 상태
3) Stable : Rebalancing까지 완료되며 안정적으로 Consumer 운영상태
Command
그룹 리스트 확인
docker exec -it kafka kafka-consumer-groups --bootstrap-server localhost:9092 --list
그룹 삭제
docker exec -it kafka kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group 그룹명
Consumer Static Group Membership
많은 Consumer가 속한 Group의 리밸런싱은 모든 Consumer가 리밸런싱하므로 많은 시간이 소모되고,
대량 데이터 처리 시 Lag가 길어진다.
그래서, 유지보수 차원의 Consumer Restart 또한 Rebalance를 초래하므로 불필요한 Rebalancing을 발생시키지 않는 방법이다.
1) Consumer Group 내의 Consumer들에게 고정된 ID를 부여한다.
2) Consumer별 Consumer Group 최초 조인 시 할당된 파티션을 유지하고, Consumer가 Shutdown되더라도
session.timeout.ms 내에 재기동되면 리밸런싱 없이 기존 파티션이 재할당된다.
단, 재기동될 때까지 할당되었던 파티션으로부터 데이터를 읽을 수는 없다.
세팅하는 방법은 자바기준으로 ID를 지정해준다.
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
props.setProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "1");
그러면 그룹에 Consumer가 추가될 때, 1 of which are static 이라는 문구가 출력된다.
또한, 재기동 하더라도 session.timeout.ms 내에만 재기동된다면
" Stable member which joins during Stable stage and doens't affect selectProtocol will not trigger rebalance. "
가 출력되면서 리조인 시에도 리밸런싱되지 않는다.
HeartBeat Thread
컨슈머는 Kafka Broker의 Group Coordinator에게 HeartBeat를 보내어 살아있음을 알린다.
이러한 Heartbeat Thread는 컨슈머가 첫번째 poll()을 할 때 생성되며,
따라서 첫번째 poll()에는 데이터를 가져오지 않는다.
heartbeat.interval.ms = 3000 (default)
Heartbeat Thread가 Heartbeat를 보내는 간격이며, Session.timeout.ms의 3분의 1 보다 낮게 권장된다.
session.timeout.ms = 45000 (default)
브로커가 Consumer로부터 Heartbeat를 기다리는 최대시간. 이 시간 내에 heartbeat를 받지 못하면,
해당 consumer를 group에서 제외하는 rebalancing 명령을 지시한다.
max.poll.interval.ms = 300000 (default)
이전 poll()을 호출한 후 다음 호출되는 poll()까지 브로커가 기다리는 시간.
해당 시간동안 poll() 호출이 일어나지 않으면 해당 consumer는 문제가 있다고 판단하여 rebalancing 명령을 보낸다.
'Kafka' 카테고리의 다른 글
9. Kafka - Producer 메커니즘 1 (0) | 2024.04.04 |
---|---|
8. Kafka - Idempotence, 멱등성 (0) | 2024.04.04 |
7. Kafka Producer 메시지 전송과 재전송 옵션 (1) | 2024.04.04 |
6. Kafka - Java Kafka Clients의 Send 메소드 (1) | 2024.04.04 |
5. Kafka - Kafka-Configs (0) | 2024.04.04 |