카프카의 컨슈머 그룹(Consumer Group)이란 동일한 Topic을 구독하는 하나 이상의 컨슈머(Consumer)를 의미한다.
즉, 컨슈머는 어떤 특정 토픽을 구독하는 한 인스턴스라고 본다면,
컨슈머 그룹은 동일한 관심사를 가지는 컨슈머의 그룹이다!
또한 모든 컨슈머들은 단 하나의 컨슈머 그룹에 소속되어야 하며, 컨슈머 그룹은 1개 이상의 컨슈머를 가질 수 있다.
즉 컨슈머 그룹과 컨슈머의 관계는 1:N이 가능하다.
이 컨슈머 그룹의 특징은,
1) 그룹 단위 메세지 처리
- 컨슈머 그룹은 Topic으로부터 메시지를 소비하고 처리하는 단위이다.
동일하나 컨슈머 그룹에 속한 컨슈머 Instance는 그룹 내에서 메시지를 공유하고, 작업을 분산하여 처리한다.
2) 파티션 할당
- Topic은 하나 이상의 파티션으로 나뉘는데, (파티셔닝 한 경우) 각 파티션은 컨슈머 그룹 내의 한 컨슈머에게만 할당된다.
동일한 파티션은 여러 컨슈머 그룹에 할당될 수 없으며, 한 컨슈머 그룹 내에서만 공유된다.
3) 병렬 처리와 확장성
- 컨슈머 그룹은 여러 컨슈머 인스턴스로 구성되어 있어 Topic의 메시지를 병렬로 처리할 수 있다.
그래서 보통은 파티션의 개수에 맞게 컨슈머의 수도 맞춰주어 병렬성을 높여준다.
4) 오프셋(offset) 관리
- 컨슈머 그룹은 Topic의 각 파티션에서 소비한 메시지의 위치를 추적하고 관리한다.
이를 통해 각 컨슈머 그룹은 처리된 메시지의 상태를 유지하고, 중복 소비를 방지한다.
단, 하나의 컨슈머 그룹 외에 다른 컨슈머 그룹이 있는 경우는 같은 파티션을 구독하더라도
서로 독립적으로 동작하며 이것을 각자의 offset 으로 관리하게 된다.
만약, 컨슈머 그룹 내에 컨슈머의 변화가 있을 시 파티션과 컨슈머의 매칭관계를 변경하는 리밸런싱(Rebalancing)이 발생한다.
리밸런싱 강제로 일으키기
먼저, 기존에 있던 토픽 삭제하기
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --delete --topic multipart-topic
다시 파티션 3개짜리 토픽 만들기
docker exec -it kafka kafka-topics --bootstrap-server localhost:9092 --create --topic partitions-topic --partitions 3
컨슈머 3개 생성하기
docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --group g1 --topic partitions-topic --property print.key=true --property print.value=true --property print.partition=true
이걸 여러 콘솔에 띄워두고
docker logs -f kafka 로 보고있으면, 컨슈머가 추가되었을 때
이렇게 뜬다.
'Dynamic member with unknown member id joins group g1 in Empty state.'
'Preparing to rebalance group g1'
'Stablilized group g1'
이렇게 새로운 컨슈머가 와서 리밸런싱을 준비하고, 마치는 로그를 확인할 수 있다.
다시 들어갔던 컨슈머를 Ctrl + C로 빠져나오면,
'Assignment received from leader ...'
'LeaveGroup'
'now empty'
와 같이 LeaveGroup 싸인이 들어오면서 결국 빈 상태 (Empty Status)가 된다.
이것을 제공해주는 "kafka-consumer-groups" 로 확인해보자.
docker exec -it kafka kafka-consumer-groups --bootstrap-server localhost:9092 --list
-> g1 이라는 그룹 리스트가 나온다 (현재 1개)
docker exec -it kafka kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group g1
그룹을 g1으로만 3개 열었을 때 각각 컨슈머가 어느 파티션에 할당되어 있는지와 offset이 어디까지 읽었는지 확인할 수 있다.
컨슈머 아이디는 브로커 자체에서 붙여주며, 파티션별로 컨슈머가 하나씩 붙어서 어디까지 읽었는지 보여준다.
LAG는 프로듀서가 넣은 데이터의 오프셋과 컨슈머가 가져간 데이터의 오프셋 차이 혹은
토픽 내의 최신 오프셋과 컨슈머 오프셋의 차이를 Kafka Consumer Lag 이라고 한다.
즉 쉽게 이해하자면 해당 파티션의 데이터는 100까지 들어와 있는데 컨슈머가 80까지 읽었다면 LAG 는 20이 된다.
이것을 늘리는 것을 테스트해보고 싶다면 키를 가지도록 메세지를 송신하여 특정 파티션에게만 가도록 한 후 다시 컨슈머 그룹을 describe해보면 된다.
마지막으로 컨슈머 그룹은, 그 컨슈머 그룹에 활동하는 컨슈머가 없게 되면 자동으로 삭제가 되는데,
수동으로 삭제할 수 있는 방법도 있다.
docker exec -it kafka kafka-consumer-groups --bootstrap-server localhost:9092 --list
docker exec -it kafka kafka-consumer-groups --bootstrap-server localhost:9092 --delete --group 그룹명
'Kafka' 카테고리의 다른 글
6. Kafka - Java Kafka Clients의 Send 메소드 (1) | 2024.04.04 |
---|---|
5. Kafka - Kafka-Configs (0) | 2024.04.04 |
3. Kafka - 키가 없는 경우의 파티셔닝 전략 (0) | 2024.04.04 |
2. Kafka - CLI 환경에서 다양한 방식의 메시지 송/수신 테스트 하기 (0) | 2024.04.04 |
1. Zookeeper란? Zookeeper의 실행 시 동작 (1) | 2024.04.03 |