시그마 삽질==six 시그마

kafka 본문

프로그래밍/Kafka

kafka

Ethan Matthew Hunt 2020. 10. 11. 10:46

 

카프카에서 중요한건
관련 서비스에 가장 적합한 전략을 찾고(어디까지 감내 가능한지 trade-off) 그것을 조화롭게 적용하는것이다.
Throughtput(처리하는 작업의 양) ,Latency(작업을 처리하는데 소요되는 시간) ,Durability(메시지 유실 최소화)  ,Availability(신속한 장애복구),중복 여부, 순서 중요 여부도 고려사항이다.  

 

 

1.카프카가 정의

 

아파치 카프카(Apache Kafka)는 분산 이벤트 스트리밍 플랫폼이며 데이터 파이프 라인을 만들 때 주로 사용되는 오픈소스 솔루션

pub-sub모델의 메세지 큐이고, 분산환경에 특화되어 설계되어 있다는 특징을 가짐

카프카는 소스 애플리케이션과 타켓 애플리케이션의 커플링을 느슨하게 하기위해 나옴(역할과 책임이 분리되어 서비스 의존도 낮아짐)

 

Event 란?
Event는 비즈니스에서 일어나는 모든 일(데이터)을 의미
Event는 BigData의 특징을 가짐
Event Stream은 연속적인 많은 이벤트들의 흐름을 의미

- 웹사이트에서 무언가를 클릭하는 것 
- 청구서 발행
- 송금
- 배송 물건의 위치 정보
- 택시의 GPS 좌표
- 센서의 온도/압력 데이터


Event Driven Architecture: 분산 시스템에서 비동기 통신 방식으로 이벤트를 발행/구독하는 아키텍쳐
EDM = Event Driven Architecture를 적용한 MicroService
1. 비동기 통신 사용 - 각 MicroService간 느슨한 결합도(Loosely Coupled) 유지 가능 
2. EDM에서 발생한 이벤트는 이벤트 스토어에 저장(이벤트 로그)
3. Transaction Management: Retry, Rollback

 

 

2. 카프카 특징

 

1)높은 처리량(High throughput)

묶음 단위로 배치처리(네트워크 통신횟수 줄임), 파티션 추가하고 컨슈머갯수 늘림으로 데이터 병렬처리 가능

 

2)확장성 (Scalability)

카프카 클러스터 브로커 갯수를  늘려서 수평확장이 가능scale-out, 반대로 scale-in도 됨

 

3)내구성(durability)

프로듀서 acks 옵션통해 메시지 내구성 강화가능

  • replication.factor=3
  • acks = all (acks = -1 동일)
  • min.insync.replicas 는 2,3 (최소 2 이상)

메시지가 삭제되지 않고 지정된 시간 ,크기만큼 로컬 디스크에 보관(이전 MQ와의 차이)하기에 브로커 종료되도 다른 브로커의 로컬 디스크에 저장된 내용을 바탕으로 복구 가능

 

 

3. 카프카 사용처

 

Event(메시지/데이터)가 사용되는 모든 곳에서 사용
-Messaging System
- IOT 디바이스로부터 데이터 수집
- 애플리케이션에서 발생하는 로그 수집
- Realtime Event Stream Processing (Fraud Detection, 이상 감지 등) o DB 동기화 (MSA 기반의 분리된 DB간 동기화)
- 실시간 ETL
- Spark, Flink, Storm, Hadoop 과 같은 빅데이터 기술과 같이 사용

 

교통 : 운전자 -탑승자 매치, 도착예상시간(ETA) 업데이트, 실시간 차량진단
금융: 사기거래,중복거래 감지
오락: 실시간 추천, 사기감지
온라인 마켓: 실시간 재고, 대용량 주문의 안전한 처리

 

 

4. 카프카 용어정의

 

1)Zookeeper : Zookeeper는 Broker를 관리 (Broker 들의 목록/설정을 관리)하는 소프트웨어

 

Zookeeper는 변경사항에 대해 Kafka에 알림:Topic 생성/제거, Broker 추가/제거 등

Zookeeper는 홀수의 서버로 작동하게 설계되어 있음 (최소3,권장5)

Zookeeper에는 Leader(writes)가 있고 나머지 서버는 Follower(reads)

Zookeeper는 분산형 Configuration 정보 유지, 분산 동기화 서비스를 제공하고 대용량 분산 시스템을 위한 네이밍 레지스트리를 제공하는 소프트웨어
분산 작업을 제어하기 위한 Tree 형태의 데이터 저장소
->Zookeeper를 사용하여 멀티 Kafka Broker들 간의 정보(변경 사항 포함) 공유, 동기화 등을 수행

Quorum 알고리즘 기반(의결시 과반수 정족수 필요):분산 시스템의 일관성을 유지시키기 위해서 

 

Your Zookeeper cluster needs to have an odd number of servers, and must maintain a majority of
servers up to be able to vote. Therefore, a 2N+1 zookeeper cluster can survive to N zookeeper being
down, so here the right answer is N=2, 2*N+1=5

 

 

 

2)Broker :카프카 애플리케이션이 설치된 서버 또는 노드

 

Kafka Broker는 Partition에 대한 Read 및 Write를 관리하는 소프트웨어

각각의 Broker들은 ID로 식별됨 (단, ID는 숫자)

Kafka를 구성하는 각 서버 1대 = 1 broker, 보통 3대 이상의 브로커 서버를 1개의 클러스터로 묶어서 운영함. +1대 더 추가해서 총4대를 클러스터로하는걸 권장.

(3 개의 Broker로 구성하고 하나의 Broker가 장애 상태시, RF 3인 Topic 생성 불가능하기 때문+데이터 유실 방지를 위해서, min.insync.replicas는 2 를 많이 사용하는데 2대 장애시 데이터 유실 가능성 높아지기 때문)

Kafka Cluster : 여러 개의 Broker들로 구성됨
Client는 특정 Broker에 연결하면 전체 클러스터에 연결됨

브로커는 Topic의 일부 Partition들을 포함:Topic 데이터의 일부분(Partition)을 갖을 뿐 데이터 전체를 갖고 있지 않음(replication 있으면 동일하게 복제를 해서 가지고 있음)

A broker can have different partitions numbers for the same topic on its disk.

A broker contains only a subset of the topics and the partitions

A broker knows all the metadata for all topics and partitions

Topic을 구성하는 Partition들은 여러 Broker 상에 분산됨. relication없으면 브로커 3개에 파티션 각각1개씩 할당.
Topic 생성시 Kafka가 자동으로 Topic을 구성하는 전체 Partition들을 모든 Broker에게 할당해주고 분배해 줌

모든 Kafka Broker는 Bootstrap(부트스트랩) 서버라고 부름
하나의 Broker에만 연결하면 Cluster 전체에 연결됨.하지만, 특정 Broker 장애를 대비하여, 전체 Broker List(IP, port)를 파라미터로 입력 권장
각각의 Broker는 모든 Broker, Topic, Partition에 대해 알고 있음(Metadata)

the requirements for a Kafka broker to connect to a Zookeeper ensemble?
1)Unique values for each broker's broker.id parameter
2)All the brokers must share the same zookeeper.connect parameter

브로커는 처리량에 따라서 Tread 관련 파라미터 튜닝 필수!, G1GC 사용해야함.

• num.io.threads (기본값 : 8) : Disk 개수보다 크게 설정
• num.network.threads (기본값 : 3) : TLS를 사용할 경우 두 배 이상으로 설정
• num.recovery.threads.per.data.dir (기본값 : 1) : Broker 시작시 빠른 기동을 위해서, core수까지만 설정
• num.replica.fetchers (기본값 : 1) : Source Broker에서 메시지를 복제하는데 사용되는 Thread 수. 빠르게 복제하기 위해 값을 증가(Latency를 만족하는). Broker의 CPU 사용률과 네트워크 사용률이 올라감
• num.cleaner.threads (기본값 : 1) : Disk 개수 혹은 core 개수까지만 설정

브로커 클러스터 변경시 피크 시간대에는 피해야한다.

 

Producer는 Leader에만 Write하고 Consumer는 Leader로부터만 Read함(2.4부터 컨슈머는 팔로워꺼도 read해갈 수 있음)
Follower는 Broker 장애시 안정성을 제공하기 위해서만 존재
Follower는 Leader의 Commit Log에서 데이터를 가져오기 요청(Fetch Request)으로 복제

 

 

Partition Leader에 대한 자동 분산

Hot Spot 방지 (하나의 broker에만 partition의 Leader가 몰리는 걸 방지)
auto.leader.rebalance.enable : 기본값 true 

Enables auto leader balancing. A background thread checks the distribution of partition leaders at regular intervals, configurable by `leader.imbalance.check.interval.seconds`. If the leader imbalance exceeds `leader.imbalance.per.broker.percentage`, leader rebalance to the preferred leader for partitions is triggered.

 

leader.imbalance.check.interval.seconds : 기본값 300 sec (300초 텀으로 리더 불균형 있는지 체크)

leader.imbalance.per.broker.percentage : 기본값 10 (다른 브로커보다 10%이상 많이 가져가면 불균형!)

 

Rack Awareness

동일한 Rack 혹은 Available Zone 상의 Broker들에 동일한 rack name 지정.(available zone 에 가끔 rack 다 나가는경우 생길 수 있다). 이 브로커들이 서로 다른 랙에 있다는걸 알려줘야함.

Topic 생성시 또는 Auto Data Balancer/Self Balancing Cluster 동작 때만 실행

Replicas for a partition are spread across different racks.

 

broker.rack

https://kafka.apache.org/documentation/#basic_ops_racks

The rack awareness feature spreads replicas of the same partition across different racks. This extends the guarantees Kafka provides for broker-failure to cover rack-failure, limiting the risk of data loss should all the brokers on a rack fail at once. The feature can also be applied to other broker groupings such as availability zones in EC2.

 

 In-Sync Replicas(ISR)

In-Sync Replicas(ISR)는 High Water Mark라고 하는 지점까지 동일한 Replicas (Leader와 Follower 모두)의 목록

ISR은 리더 브로커가 관리한다.

High Water Mark: ISR내에서 모든 팔로워의 복제가 완료되면, 리더는 내부적으로 커밋되었다는 표시를 하게됨.

리더가 팔로워 복제 완료 확인후 커밋한 마지막 커밋 오프셋 위치를 하이워터마크라고 부른다.

이렇게 커밋된 메시지만 컨슈머가 읽어갈 수 있음.

ISR은 Leader 장애시 Leader를 선출하는데 사용. ISR 중에서 새 Leader를 선출 <---못따라잡는애로 하면 데이터 유실!

 

----(x)

예정에는 이 옵션으로 ISR 여부 판단

replica.lag.max.messages=4  리더와의 lag차이가 4미만인걸 충족시키는걸 ISR이라함.

This parameter decides the allowed difference between replica’s offset and leader’s offset. if the difference becomes more than (replica.lag.max.messages-1 ) then that replica is considered to be lagging behind and is removed from the in-sync replica list.

그러나 msg/sec이 급격히 늘어날시 대부분 ISR에서 벗어날 수 있음.

----(x)

 

replica.lag.time.max.ms으로  ISR 판단해야 함
• Follower가 Leader로 Fetch 요청을 보내는 Interval을 체크
• 예) replica.lag.time.max.ms = 10000 (10초)이라면 Follower가 Leader로 Fetch 요청을 10000 ms 내에만 요청하면 정상으로 판단(초과시 리더는 그 팔로워를 ISR 그룹에서 제외)
• Confluent 에서는 replica.lag.time.max.ms 옵션만 제공(복잡성 제거)

 

replica.lag.time.max.ms
If a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isr

Type: long
Default: 30000 (30 seconds)
Valid Values:
Importance: high
Update Mode: read-only

 

 

Follower 장애시 (ISR은 Leader가 관리) L->Z->C->B

- Follower가 너무 느리면(replica.lag.time.max.ms에 응답x면) Leader는 ISR에서 Follower를 제거-> ZooKeeper에 ISR을 알림
- ZooKeeper는 Partition Metadata에 대한 변경 사항에 대해서 -->Controller에게 알림.

-Controller는 --->전체 브로커에게 알려줌

 

Leader 브로커가 장애시 Z->C->Z,B

ZooKeeper가 session.timeout을 통해 장애를 감지 ->Controller새로운 리더 선출 및 새로운 ISR리스트를 ZooKeeper에 기록

Controller모든 브로커에거 새로운 ISR을 push -> producer,consumer는 메타데이터를 요청하여 새로운 리더 정보를 받음

 

Once the client connects to any broker, it is connected to the entire cluster and in case of leadership
changes, the clients automatically do a Metadata Request to an available broker to find out who is
the new leader for the topic. Hence the producer will automatically keep on producing to the correct
Kafka Broker

The producer will automatically produce to the broker that has been elected leader

 

 

Partition에 Leader가 없으면,
Leader가 선출될 때까지 해당 Partition을 사용할 수 없게 됨 

Producer의 send() 는 retries 파라미터가 설정되어 있으면 재시도함 

만약 retries=0 이면, NetworkException이 발생함

 

 Controller
Kafka Cluster 내의 Broker중 하나가 Controller가 됨(elected by Zookeeper ensemble)
• Controller는 ZooKeeper를 통해 Broker Liveness를 모니터링
• Controller는 Leader와 Replica 정보를 Cluster내의 다른 Broker들에게 전달
• Controller는 ZooKeeper에 Replicas 정보의 복사본을 유지한 다음 더 빠른 액세스를 위해 클러스터의 모든 Broker들에게 동일한 정보를 캐시함
• Controller가 Leader 장애시 Partition Leader Election을 수행(is responsible for partition leader election)
• Controller가 장애가 나면 다른 Active Broker들 중에서 재선출됨

 

 

 Committed의 의미
ISR 목록의 모든 Replicas가 메시지를 받으면 “Committed”
• ISR 목록의 모든 Replicas가 메시지를 성공적으로 가져오면 “Committed”
• Consumer는 Committed 메시지만 읽을 수 있음
• Leader는 메시지를 Commit할 시기를 결정
• Committed 메시지는 모든 Follower에서 동일한 Offset을 갖도록 보장

• 즉, 어떤 Replica가 Leader인지에 관계없이 (장애 발생이라도) 모든 Consumer는 해당 Offset에서 같은 데이터를 볼 수 있음

• Broker가 다시 시작할 때 Committed 메시지 목록을 유지하도록 하기 위해,
Broker의 모든 Partition에 대한 마지막 Committed Offset은 replication- offset-checkpoint라는 파일에 기록됨

 

High Water Mark
• 가장 최근에 Committed 메시지의 Offset 추적
• replication-offset-checkpoint 파일에 체크포인트를 기록 

 

 

Leader Epoch
• 새 Leader가 선출된 시점을 Offset으로 표시
• Broker 복구 중에 메시지를 체크포인트로 자른 다음 현재 Leader를 따르기 위해 사용됨
• Controller가 새 Leader를 선택하면 Leader Epoch를 업데이트하고 해당 정보를 ISR
목록의 모든 구성원에게 보냄
• leader-epoch-checkpoint 파일에 체크포인트를 기록

 

 

auto.create.topics.enable: true(default,broker setting)

If  produce to a topic that does not exist, Kafka will automatically create the topic with the broker settings num.partitions and default.replication.factor.

 

토픽 생성 토픽 자동생성

A kafka broker automatically creates a topic under the following circumstances

- When a producer starts writing messages to the topic

- When a consumer starts reading messages from the topic
- When any client requests metadata for the topic

 

 

2-1)Topic :  Kafka 안에서 메시지가 저장되는 장소, 논리적인 표현. 각 토픽은 카프카 클러스터에서 유니크함.

토픽생성시  replication-factor 설정함

replication-factor: 토픽의 파티션을 복제할 복제 개수. 최소1(본인), 최대 브로커 개수

고려사항: 데이터 처리량, 메시지 키 사용 여부, 브로커/컨슈머 영향도

 

토픽명: rule을 정해 패턴화하여 의미를 부여.토픽명은 한번 정하면 바꾸기가 매우 어려움

ex) event.xxx.v1 , event.write.comment.v1

 

토픽의 파티션 개수 계산

1초당 메시지 발행수/ consumer thread 1개가 1초당 처리하는 메시지수  1000/100 10개의 파티션필요

 

https://www.popit.kr/kafka-운영자가-말하는-topic-replication/

https://www.popit.kr/kafka-운영자가-말하는-replication-factor-변경/

 

테스트나 개발: replication-factor 수 1 (리더 1)

운영환경(로그성 메세지 약간 유실 허용) replication 2 (리더 1 팔로워 1)

운영환경(유실 허용 하지 않음) 리플리케이션 팩터 3 (리더 1 팔로워2)

 

retention.ms: 로그 세그먼트 보관 시간이 해당 숫자보다 크면 세그먼트를 삭제한다는 의미. kafka-config.sh 로 수정가능 . 카프카는 5분 간격으로 로그 세그먼트 파일 체크하면서 삭제작업 수행함. retention.ms=0 한다고 바로 삭제되는게 아니라 5분후 삭제됨. 미설정시

server.properties에 적용된 기본값 7일이 적용됨

 

How will you set the retention for the topic named my-topic to 1 hour?

Set the topic config retention.ms to 3600000

retention.ms can be configured at topic level while creating topic or by altering topic. It shouldn't be
set at the broker level (log.retention.ms) as this would impact all the topics in the cluster, not just the
one we are interested in

 

retention.bytes: 지정된 크기 기준으로 로그 세그먼트 삭제가능

 

데이터 처리 속도 높이는 방법

1.컨슈머의 처리량 늘리는것(컨슈머 실행되는 서버 사용올린다. GC 튜닝한다)

2. 컨슈머 추가해서 병렬처리량 늘린다.

프로듀서 전송 데이터량 < 컨슈머 데이터 처리량x 파티션 개수

 

메시지 키

프로듀서가 토픽으로 데이터 보낼때 메시지 키를 해시 변환하여 메시지 키를 파티션에 매칭시킨다. 파티션 개수 달라지면 매치 파티션과 메시지 키의 매칭이 깨짐.

메세지키와 파티션을 잘 매칭해서 분산처리에 사용할 수 있다.

토픽 생성시 파티션 개수 고려사항
1)데이터 처리량 2) 메시지 키 사용 여부 3) 브로커,컨슈머 영향도

 

순서 중요시 초기에 파티션 넉넉히

순서 중요 x시 파티션 넉넉히 x

 

토픽정리 정책

토픽삭제정책, 토픽 압축 정책

replica.lag.time.max.ms 값만큼 주기를 가지고 팔로워 파티션의 데이터 복제 확인(초과시 ISR 그룹에서 제외)

 

Log Compaction 설정
Compaction 성능 튜닝 관련 옵션(토픽 관련 옵션임)

• log.cleaner.min.cleanable.ratio (기본값 : 0.5)
    Head 영역 데이터가 Tail 영역보다 크면(기본값 50%), Cleaner 시작
• log.cleaner.io.max.bytes.per.second (기본값 : 무제한)
   Log Cleaner의 Read/Write 의 처리량을 제한하여 시스템 리소스 보호 가능

• 동일한Key를갖는메시지가매우많은경우,더빠른정리를위해서아래의파라미터를증가시켜야함
   • log.cleaner.threads (기본값 : 1)
   • log.cleaner.dedupe.buffer.size (기본값 : 134,217,728)

 

Tombstone Message
log compaction은 최신 key만 남겨두지만
Log Compaction시에 아예 특정 Key 데이터 삭제하는 방법
Compaction 사용시에 Key로 K를 가지는 메시지를 지우려면, 동일한 Key(K)에 null value를 가지는 메시지를 Topic으로 보내면 됨

 

 

Changing a configuration parameter on a Kafka broker will be overridden by corresponding topic configuration settings (토픽의 파라미터가 브로커의 파라미터보다 우선한다)

 

 

2-2)Partition:병렬 처리 또는 고성능을 얻기위해 하나의 토픽을 여러 개로 나눈 것

Partition은 Segment File들로 구성됨

레코드를 담고 있음. 컨슈머 요청시 레코드 전달( topic이 복사(replicated)되어 나뉘어지는 단위)

Event(Message) 의 순서는 하나의 Partition내에서만 보장

파티셔너: 토픽의 어느 파티션으로 보내야할지 결정 (기본적으로 메시지의 키를 해시처리해 파티션 구함)

파티션 수 변경시 다른 파티션으로 전송될 수 있음.

파티션 늘리면 줄일 수 없어서 초기에 파티션 수 작게 2,4 정도로 생성후 메시지 처리량이나 컨슈머의 LAG 등을 모니터링 하면서 늘려가는게 좋다.

 

라운드로빈 전략(배치처리를 위한 대기로인한 비효율) 

스티키 파티셔닝 전략(하나의 파티션에 레코드 수 먼저 채워서 빠르게 배치전송하는 전략. 30%이상 지연시간 감소-순서 비중요시 추천)

 

(Topic,Partition,Offset) uniquely identifies a message in Kafka

 

 

 

2-3)세그먼트: 프로듀서가 전송한 실제 메시지가 브로커의 로컬 디스크에 저장되는 실제 물리 파일

Segment File이 지정된 크기보다 크거나 지정된 기간보다 오래되면 새 파일이 열리고 메시지는 새 파일에 추가됨

One file and two indexes per segment

Rolling Strategy : log.segment.bytes(default 1 GB), log.roll.hours(default 168 hours)

 

cd ..../data/kafka-logs/topic명-파티션번호

 

 

xxd  0000000000000000오프셋번호.log  

<--Log Segment File의 파일명은 해당 Segment File에 저장된 첫번째 메시지의 Offset

 

$ ls /data/kafka/kafka-log-a/test_topic-0
00000000000000123453.index <---실제 메시지들이 저장되는 파일

00000000000000123453.timeindex <--로그 세그먼트에 저장된 위치와 오프셋 정보를 기록한 파일

00000000000000123453.log <-- 메시지의 타임스탬프를 기록하는 파일

00000000000007735204.index 

00000000000007735204.timeindex 

00000000000007735204.log 

leader-epoch-checkpoint

 

00000000000000123453.* 파일은 00000000000000123453 offset부터 

00000000000007735203 offset까지의 메시지를 저장/관리 

[00000000000007735203 = 00000000000007735204 -1]

 

kafka-dump-log.sh --print-data-log --files 파일위치/파일명     <---메시지 볼 수 있음

Partition 디렉토리에 생성되는 Files Types 은 최소 4가지
• Log Segment File ‒ 메시지와 metadata를 저장
-> .log
• Index File ‒ 각 메시지의 Offset을 Log Segment 파일의 Byte 위치에 매핑
->.index
• Time-based Index File ‒ 각 메시지의 timestamp를 기반으로 메시지를 검색하는 데 사용
 ->.timeindex
• Leader Epoch Checkpoint File ‒ Leader Epoch과 관련 Offset 정보를 저장

-> leader-epoch-checkpoint

 

특별한 Producer 파라미터 사용하면 Partition 디렉토리에 생기는 Files Types
• Idempotent Producer를 사용하면
->.snapshot
• Transactional Producer를 사용하면

->  .txnindex

 

아래의 파라미터 중 하나라도 해당되면 새로운 Segment File로 Rolling
• log.segment.bytes (default: 1 GB)
• log.roll.ms (default: 168 시간)
• log.index.size.max.bytes (default: 10 MB)
__consumer_offset (Offset Topic)의 Segment File Rolling 파라미터는 별도
• offsets.topic.segment.bytes (default: 100 MB)

 

각 Broker에는 2 개의 Checkpoint File이 존재함 • log.dirs 디렉토리에 위치함
• replication-offset-checkpoint
마지막으로 Commit된 메시지의 ID인 High Water Mark
시작시 Follower가 이를 사용하여 Commit되지 않은 메시지를 Truncate
• recovery-point-offset-checkpoint
데이터가 디스크로 Flush된 지점
복구 중 Broker는 이 시점 이후의 메시지가 손실되었는지 여부를 확인

 

 

 

2-4)레코드(메시지==이벤트==데이터):프로듀서가 브로커로 전송하거나 컨슈머가 읽어가는 데이터 조각

Headers(metadata): topic, partition, timestamps,etc 

 key, value : Body(business relevant Data)

 

(타임스템프,메세지키,메세지값,오프셋-프로듀서가 생성한 레코드가 브로커로 전송되면 오프셋과 타임스탬프가 지정되어 저장됨)

Offset : 각 레코드당  할당된 고유 번호. Commit Log 에서 Event의 위치

 

 

 

 

 

3)Producer:메시지를 생산(Produce)해서 Kafka의 Topic으로 메시지를 보내는 애플리케이션

레코드를 브로커로 전송하는 애플리케이션(Broker에 data를 write하는 역할)

프로듀서가 Record(데이터)를 (json,string,Avro,Protobuf) 직렬화를 통해 바이트어레이로 전달하고 카프카(브로커는)  바이트어레이 (Byte Array)를 저장.  컨슈머는  바이트어레이(Byte Array) 역직렬화 해서 사용.

 

producerRecord--send()-->Serializer->Partitioner->Compress(optional)->RecordAccumulator ->produceRequest ->카프카 브로커

 

 

-파티셔너

프로듀서는 토픽으로 메시지를 보낼 때 해당 토픽의 어느 파티션으로 메시지를 보내야할지를 결정해야하는데 이때 사용하는것이 파티셔너임.  프로듀서가 파티션을 결정하는 알고리즘은 기본적으로 메시지(레코드)의 키를 해시 처리해 파티션을 구하는 방식을 사용.

Partition = Hash(Key) % Number of Partitions

 

key가 null이 아니라면 특정 데이터들은 해당 토픽에서 원하는 파티션으로 보낼 수 있음.

 

key null 이면

Kafka 2.4 이전의 DefaultPartitioner는 Round Robin 정책으로 동작

Kafka 2.4 이후의 DefaultPartitioner는 Sticky 정책으로 동작. 하나의 Batch가 닫힐 때까지 하나의 partition에게 record를 보내고 랜덤으로 Partition 선택

 

파티션수 변경시 메시지의 키와 매핑된 해시 테이블도 변경되어서 프로듀서가 동일한 메시지의 키를 이용해 메시지를 전송하더라도 피티션의 수를 늘린 후에는 다른 파티션으로 전송될 수 있음.

 

Partitioner는 디폴트 파티셔너 말고 개발해서 교체가능하다.

 

 

스티키 파티셔닝 전략:하나의 파티션에 레코드 수를 먼저 채워서 카프카로 빠르게 배치 전송하는 전략.메시지 순서 중요하지 않다면 적용하라

 

bootstrap.servers : 브로커의 호스트/포트 정보의 리스트 형태의 값

 

 

 

acks옵션( producer setting )

acks 옵션은 간단하게 말해 프로듀서가 메시지를 보내고 그 메시지를 카프카가 잘 받았는지 확인을 할 것인지 또는 확인을 하지 않을 것인지를 결정하는 옵션

 

 

acks=0:  프로듀서가 리더파티션으로 데이터 전송했을때 저장. 성공 확인 안함(유실되도 속도전 필요시 )

"written successfully" the moment the message was sent

 

acks=1: 프로듀서가 리더 파티션으로 데이터 전송했을때 저장 성공 확인함. but 팔로워 파티션은 고려 x(팔로워 파티션 복제 직전 리더 파티션 있는 브로커에 장애 발생하면 동기화하지 못한 일부 데이터 유실가능)

"written successfully" acknowledged by only the leader.

At most once(최대 한 번) 전송 보장

 

acks=All(default),-1: 프로듀서가 ISR에 포함된 파티션( 리더/모든 팔로워)에 적재 성공 확인.

 "written successfully" all in-sync replicas (ISR). 리더 파티션과 팔로워 파티션이  모두 싱크가 완료된 상태를 뜻함.

At least once(최소 한 번) 전송을 보장

 

min.insync.replicas(topic or broker setting) : min.insync.replicas 옵션은 프로듀서가 acks=all로 설정하여 메시지를 보낼 때, write를 성공하기 위한 최소 복제본의 수를 의미

only effective when acks=all

1 리더만

2 리더 +팔로어1

3 리더+팔로어2

...

Replication Factor 3일 경우 min.insync.replicas 2어야 의미있음. 반드시 브로커 개수 미만으로 설정해서 운영해야함!!!Replication Factor 3일 경우 min.insync.replicas 2어야 의미있음. 반드시 브로커 개수 미만으로 설정해서 운영해야함!!!

min.insync.replicas <  brokers number.

운영환경에서 가장 많이 쓰이는 옵션은 프로듀서의 acks=all보다는 acks=1를 가장 많이 사용

 최소 복제 수를 충족하지 못하면 leader는 프로듀서에게 확인 응답을 보낼 수 없게 되었고, 결국 write는 실패

org.apache.kafka.common.errors.NotEnoughReplicasException

https://www.popit.kr/kafka-운영자가-말하는-producer-acks/

 

 

For a topic replication factor of 3, topic data durability can withstand 2 brokers loss. As a general rule, for a replication factor of N, you can permanently lose up to N-1 brokers and still recover your data

 

  • acks=0 & acks=1 : as long as one partition is up and considered an ISR, the topic will be available for writes.
  • acks=all:
    • min.insync.replicas=1 (default): the topic must have at least 1 partition up as an ISR (that includes the reader) and so we can tolerate two brokers being down.
    • min.insync.replicas=2: the topic must have at least 2 ISR up, and therefore we can tolerate at most one broker being down (in the case of replication factor of 3), and we have the guarantee that for every write, the data will be at least written twice.
    • min.insync.replicas=3: this wouldn't make much sense for a corresponding replication factor of 3 and we couldn't tolerate any broker going down.
    • in summary, when acks=all with a replication.factor=N and min.insync.replicas=M we can tolerate N-M brokers going down for topic availability purposes.



batch.size(기본 16KB):  보재기 전 batch의 최대 크기 .배치로 전송할 레코드 최대용량을 지정한다..프로듀서는 동일한 파티션으로 보내는 여러 데이터를 함께 배치로 보내려고 시도한다. 적절한 배치 크기 설정은 성능에 도움이 된다. 

linger.ms(default 0 즉시보냄): 배치 전송을 위해 버퍼 메모리에서 대기하는 메시지들의 최대 대기시간 설정하는 옵션. 단위는 밀리초며 기본값은 0. increase the chances of batching (배치 가능성을 높여주는거임!!! 그러나 배치 full 로 차있는 경우 가정시 의미없음the batches are completely full each time)
0으로 설정시 배치 전송위해 기다리지 않고 메시지들을 즉시 전송함

 

Batch 처리의 일반적인 설정은 linger.ms=100 및 batch.size=1000000

 

buffer.memory: 프로듀서가 카프카 서버로 데이터를 보내기 위해 잠시 대기(배치 전송이나 딜레이 등) 할 수 있는 전체 메모리 바이트.기본32MB

처리량을 높이려면 batch.size와 linger.ms의 값을 크게 설정해야하고 (이때 buffer.memory 크기는 반드시 batch.size 보다 커야 함. 토픽A 3개 파티션 batch.size :16KB 면 프로듀서 buffer.memory:최소크기는 16KB*3 임. 실패후 재시도까지 고려하면 48KB보다 커야함. buffer.memory size>=batch.size * 파티션수

 

compression.type :프로듀서가 메시지 전송시 선택 할 수 있는 압축 타입. none,gzip,snappy,lz4,zstd 높은 압축률을 선호한다면 gzip,zstd를 선택하는 것이 좋음. 전송데이터가 많아서 네트워크 대역폭(Network Bandwidth) 부하를 주는 상황일때 필요.but 압축시 cpu 사용량 증가 trade-off

 

압축의 장점

1.네트워크 대역폭 사용량을 줄임
2.Kafka 브로커에서 디스크 공간 절약

https://developer.ibm.com/articles/benefits-compression-kafka-messaging/



지연없는 전송이 목표라면 batch.size와 linger.ms의 값을 작게 설정해야함. 낮은 지연시간 선호시 압축은 lz4, snappy 선택하기를 추천

처리량 throughput: 처리하는 작업의 양
지연시간 latency: 작업을 처리하는 데 소요되는 시간
처리량 높아지면 지연시간 길어지고 처리량 낮아지면 지연시간 짧아짐

 

 

프로듀서의 메시지 순서를 보장하려면 Producer에서 enable.idempotence를 true로 설정

max.in.flight.requests.per.connection(default 5)인경우 배치 0~배치 4까지 브로커한테 던지는데 0이 Batch0가 실패했지만, Batch1은 성공하면, Batch1이 Batch0보다 먼저 Commit Log에 추가되어 순서가 달라짐. 그때 enable.idempotence: true를 사용하면, 하나의 Batch가 실패하면, 같은 Partition으로 들어오는 후속 Batch들도 OutOfOrderSequenceException과 함께 실패

 

 

 

enable.idempotence: 설정을 true로 하는 경우 중복 없는 전송이 가능하며(덤으로 프로듀서의 순서 보장도 됨), 이와 동시에 max.in.flight.requests.per.connection 은 5이하, retries는 1이상 , acks는 all로 설정해야함.  

 

기본적으로는 적어도 한 번 전송과 같으나 메시지 전송시 PID와 시퀀스 번호(프로듀서가 중복없는 전송을 시작하면 프로듀서는 고유한 pid를 할당받게 되고, 시퀀스 번호는 동일한 생산자가 보낸 각 레코드에 대해 논리적으로 1씩 증가)를 헤더에 붙여서 보내고 브로커는 그것을 메모리에 저장함.프로듀서가 동일한 데이터를 재전송해도 브로커가 들어오는 메시지의 시퀀스가 현재 브로커에 등록된 최근의 시퀀스+1이 아니라면  저장안하고 ACK만 보냄.
PID는 프로듀서와 카프카 사이에서 내부적으로 이용되므로 사용자에게 노출되지 않는다. 시퀀스 번호는 0부터 순차적으로 증가함.
오버헤드가 있지만 높은 편은 아님. 중복 없는 전송 적용후 20% 성능 감소

 

When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer retries due to broker failures, etc., may write duplicates of the retried message in the stream. Note that enabling idempotence requires max.in.flight.requests.per.connection to be less than or equal to 5 (with message ordering preserved for any allowable value), retries to be greater than 0, and acks must be 'all'.

Idempotence is enabled by default if no conflicting configurations are set. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled. If idempotence is explicitly enabled and conflicting configurations are set, a ConfigException is thrown.

 

To achieve idempotence Kafka uses a unique id which is called product id or PID and sequence number while producing messages. The producer keeps incrementing the sequence number on each message published which map with unique PID. The broker always compare the current sequence number with the previous one and it rejects if the new one is not +1 greater than the previous one which avoids duplication and same time if more than greater show lost in

 

 

https://stackoverflow.com/questions/58894281/difference-between-idempotence-and-exactly-once-in-kafka-stream

 

max.in.flight.requests.per.connection(default 5): 하나의 커넥션에서 프로듀서가 브로커에게 ack 없이 한꺼번에 전송할 수 있는 최대한 요청 수. 메시지의 순서가 중요하다면 1로 설정할것을 권장하지만 성능은 다소 떨어짐

 

The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this config is set to be greater than 1 and enable.idempotence is set to false, there is a risk of message re-ordering after a failed send due to retries (i.e., if retries are enabled). Additionally, enabling idempotence requires this config value to be less than or equal to 5. If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.

(배치가 아닌경우)1이면 한번에 한개의 메시지만 보내기에 한 개 전송 후 브로커로부터 ack 받은 후 그 다음 한 개 전송하고 ack 받고 이렇게 진행되서 메시지 순서가 보장이 되는데 만약 2이상으로 하면 2개를 한꺼번에 던지는데 메시지2는 성공하고 메시지1은 실패시 프로듀서는 메시지1만 재전송하게 됨. 결국 순서가 뒤바뀔 수 있음.

 

만약 배치인데 enable.idempotence를 false로 해놓았다면  max.in.flight.requests.per.connection 를 1로 해도 의미가 없음. 위와 같은 논리로 배치 속에 여러 메시지중 뒤에 것이 성공 앞이 실패 하고 재전송시 뒤바뀔 수 있음.

 

Allowing retries while setting enable.idempotence to false and max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.

 

transactional.id: 프로듀서가 레코드를 전송할때 레코드를 트랜잭션 단위로 묶을지 여부를 설정한다.

정확히 한 번 전송을 위해 사용하는 옵션으로 동일한 transactionId에 한해 정확히 한 번을 보장한다. 옵션을 사용하기 전 enable.idempotence는 true로 설정해야한다

 

spring producer config 옵션중에

TRANSACTION_ID_CONFIG 옵션은 실생하는 프로듀서 프로세스마다 고유한 아이디로 설정해야한다. 2개의 프로듀서가 있다면 프로듀서마다 다른 아이디로 설정해야한다 -고승범님 아파치카프카 개발에서 운영까지

 

transactional.id
The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. Note that enable.idempotence must be enabled if a TransactionalId is configured. The default is null, which means transactions cannot be used. Note that, by default, transactions require a cluster of at least three brokers which is the recommended setting for production; for development you can change this, by adjusting broker setting transaction.state.log.replication.factor.

 

 

Producer Retry
재전송을 위한 Parameters

retries: 프로듀서가 브로커로부터 에러를 받고 난 뒤 재전송을 시도하는 횟수를 지정한다

NOT_ENOUGH_REPLICAS, NOT_LEADER_FOR_PARTITION (retry 가능한 에러)

MessageSizeTooLarge( retry 불가 에러)

 

retry.backoff.ms(100): 재시도 사이에 추가되는 대기 시간

request.timeout.ms(30,000(30초)) :Producer가 응답을 기다리는 최대 시간

delivery.timeout.ms(120,000(2분)): send() 후 성공 또는 실패를 보고하는 시간의 상한

현업에선 retries를 조정하는 대신에 delivery.timeout.ms 조정으로 재시도 동작을 제어

acks=0 에서 retry는 무의미

 

 

Failure scenarios

Ack Failed(etry>1 but could not receive acknowledge due to failure)

Producer process failed in batch messages( it failed with few published success)

Fire & Forget Failed(retry=0(fire and forget) or ack=0)

Consumer failed in batch message(consumers failed before committing to Kafka,)

 

메시지 전송방식

아파치 카프카 Exactly-once 처리의 진실과 거짓

https://voidmainvoid.tistory.com/504

 

1) 적어도 한 번 전송 (at least once) acks=all ,  default임

메세지 손실 없지만 중복 가능성 있음

Producer가 Kafka Broker로부터 ack를 수신하고 acks=all이면 메시지가 Kafka Topic에 최소 한 번 작성되었음을 의미함
그러나 ack가 시간 초과되거나 오류를 수신하면 메시지가 Kafka Topic에 기록되지 않았다고 가정하고 메시지전송을 다시 시도 할 수 있음
Broker가 ack를 보내기 직전에 실패했지만 메시지가 Kafka Topic에 성공적으로 기록된 후에 이 재시도를 수행하면 메시지가 두 번 기록되어 최종 Consumer에게 두 번 이상 전달되어 중복 작업과같은잘못된결과로이어질수있음

 

프로듀서가 B 보냈는데 브로커가 ACK 안주면 재전송.

프로듀서는 브로커가 메시지 저장은 했는데 ACK만 못보낸건지(중복전송상황됨) 아니면 저장 못해서 ACK 못낸건지 모름.(처음 저장)

카프카는 기본적으로 적어도 한 번 전송 방식을 기반으로 동작함. 

https://kafka.apache.org/documentation/#producerconfigs_acks

 

2) 최대 한 번 전송 (at most once) acks=0 or 1

메시지 손실 가능성 있지만 메시지 중복 가능성 없음

 

확인 시간이 초과되거나 오류가 반환될 때 Producer가 재시도하지 않으면, 메시지가 Kafka Topic에 기록되지 않아 Consumer 에게 전달되지 않을 수 있음

중복 가능성을 피하기 위해 때때로 메시지가 전달되지 않을 수 있음을 허용

 

한번 보내면 받았다 가정하고 재 전송 안함(중복가능성 회피위해 메시지의 손실을 감수)

대량의 로그 수집이나 IoT 같은 환경에서 사용됨

 

 

3-0) 중복 없는 전송

카프카의 0.11버전에서는 프로듀서가 메시지를 중복 없이 브로커로 전송할 수 있는 기능 추가됨

이 방식은 정확히 한 번 전송은 아니지만 중복은 없게 보낼 수 있게 해주는 것.

정확히 한 번 전송은 트랜잭션과 같은 전체적인 처리 프로세스를 의미하며 중복 없는 전송은 정확히 한 번 전송의 일부 기능중 하나라고 볼 수 있음.

기본적으로는 적어도 한 번 전송과 같으나 메시지 전송시 PID와 시퀀스 번호(시퀀스 번호는 동일한 생산자가 보낸 각 레코드에 대해 논리적으로 1씩 증가)를 헤더에 붙여서 보내고 브로커는 그것을 메모리에 저장함.프로듀서가 동일한 데이터를 재전송해도 브로커가 들어오는 메시지의 시퀀스가 현재 브로커에 등록된 최근의 시퀀스+1이 아니라면  저장안하고 ACK만 보냄.
PID는 프로듀서와 카프카 사이에서 내부적으로 이용되므로 사용자에게 노출되지 않는다. 시퀀스 번호는 0부터 순차적으로 증가함.
오버헤드가 있지만 높은 편은 아님. 중복 없는 전송 적용후 20% 성능 감소

enable.idempotence 설정을 true로 하는 경우 중복 없는 전송이 가능하며, 이와 동시에 max.in.flight.requests.per.connection 은 5이하, retries는 1이상, acks는 all로 설정해야함.  PID와 시퀀스 번호를 헤더에 넣어서 보내주고 그걸 브러커는 메모리에 기록하기에 동일 메시지가 오면 브로커는 PID와 시퀀스번호를 보고 동일 메시지임을 확인 후 안받음.

 

 

 

 

 

3) 정확히 한 번 전송 (exactly once)

 

Producer가 메시지 전송을 다시 시도하더라도 메시지가 최종 Consumer에게 정확히 한 번 전달됨
 메시징 시스템 자체와 메시지를 생성하고 소비하는 애플리케이션 간의 협력이 반드시 필요함
예를 들어, 메시지를 성공적으로 사용한 후 Kafka Consumer를 이전 Offset으로 되감으면 해당
Offset에서 최신 Offset까지 모든 메시지를 다시 수신하게 됨

 

데이터가 "정확히 한 번" 처리되도록 보장해야 하는 실시간 미션 크리티컬 스트리밍 Application
• 클라이언트(Idempotent Producer)에서 생성되는 중복 메시지 방지
• Transaction 기능을 사용하여, 하나의 트랜잭션내의 모든 메시지가 모두 Write 되었는지 또는 전혀 Write 되지 않았는지 확인(Atomic Message)
Use Cases
• 금융거래처리‒송금,카드결제등
• 과금정산을위한광고조회수추적
• Billing 서비스간 메시지 전송

 

Java 클라이언트에서만 Fully Supported (AK 0.11.0 부터)
• Producer, Consumer
• Kafka Connect
• Kafka Streams API
• Confluent REST Proxy
• Confluent ksqlDB
Transaction Coordinator 사용
• 특별한 Transaction Log를 관리하는 Broker Thread
• 일련의 ID 번호(Producer ID, Sequence Number, Transaction ID)를 할당하고 클라이언트가 이 정보를 메시지 Header에 포함하여 메시지를 고유하게 식별
• Sequence Number는 Broker가 중복된 메시지를 skip할 수 있게 함

 

 

Idempotent Producer
• Producer의 파라미터중 enable.idempotence 를 true 로 설정
• Producer가 Retry(재시도)를 하더라도, 메시지 중복을 방지
• 성능에 영향이 별로 없음
Transaction
• 각 Producer에 고유한 transactional.id 를 설정
• Producer를 Transaction API를 사용하여 개발
• Consumer에서 isolation.level 을 read_committed 로 설정
Broker의 파라미터는 Transaction을 위한 Default 값이 적용되어 있음 (필요시에만 수정 필요)

 

ex)

-메시지는 Sequence Number와 고유한 Producer ID를 가지고 있음

-Broker는 메모리에 map { Producer ID : Sequence Number }를 저장함 이 map은 *.snapshot 파일로 저장됨

-Broker가 Producer에게 ack를 보내지 못한 경우를 가정

-Producer는 ack를 받지 못했으므로, 동일한 메시지에 대한 재시도(retries)를 수행 enable.idempotence=true 설정을 하지 않았다면, Broker의 메시지 중복 수신이 불가피

-Broker가 체크하여 메시지가 중복된 것을 확인
-메시지를 저장하지 않고, Producer에게 DUP response를 리턴

 

 

 

Exactly Once Semantics 관련 파라미터

 

 

Producer의 파라미터중 enable.idempotence 를 true 로 설정, acks=all, retries > 1, max.inflight.requests.per.connection=1 

각 Producer에 고유한 transactional.id 를 설정

 

Consumer에서 isolation.level 을 read_committed 로 설정, enable.auto.commit:false

 

spring producer config 옵션중에

TRANSACTION_ID_CONFIG 옵션은 실생하는 프로듀서 프로세스마다 고유한 아이디로 설정해야한다. 2개의 프로듀서가 있다면 프로듀서마다 다른 아이디로 설정해야한다 -고승범님 아파치카프카 개발에서 운영까지

 

 

Transaction을 구현하기 위해, 몇 가지 새로운 개념들이 도입

• Transaction Coordinator : Consumer Group Coordinator와 비슷하게, 각 Producer에게는 Transaction Coordinator가 할당되며, PID 할당 및 Transaction 관리의 모든 로직을 수행
• Transaction Log : 새로운 Internal Kafka Topic으로써, Consumer Offset Topic과 유사하게, 모든 Transaction의 영구적이고 복제된 Record를 저장하는 Transaction Coordinator의 상태 저장소
• TransactionalId : Producer를 고유하게 식별하기 위해 사용되며, 동일한 TransactionalId를 가진 Producer의 다른 인스턴스들은 이전 인스턴스에 의해 만들어진 모든 Transaction을 재개(또는 중단)할 수 있음

 

 

Broker Configs

Parameter
설명
Default 
transactional.id.expiration.ms
 Transaction Coordinator Producer TransactionalId로부터 Transaction 상태 업데이트를 수신하지 않고 사전에 만료되기 전에 대기하는 최대 시간(ms)
604800000 (7 days)
transaction.max.timeout.ms
  • Transaction에 허용되는 최대 timeout 시간
  • Client가 요청한 Transaction 시간이 이 시간을 초과하면 오류를 반환
  • Broker InitPidRequest에서 InvalidTransactionTimeout
  • Producer Transaction에 포함된 Topic에서 읽는
  • Consumer를 지연시킬 수 있는 너무 큰 시간 초과를 방지
900000 (15 min)
transaction.state.log.replication.factor
 Transaction State Topic Replication Factor
3
transaction.state.log.num.partitions
 Transaction State Topic Partition 개수
50
transaction.state.log.min.isr
 Transaction State Topic min ISR 개수
2
transaction.state.log.segment.bytes
 Transaction State Topic Segment 크기
104857600 bytes

                                                      출처 :김현수  컨플루언트 Senior Solutions Engineer 강의

 

Producer Configs

Parameter
설명
Default 
enable.idempotence
  • 비활성화된 경우 Transaction 기능을 사용할 수 없음
  • 활성화(true)하고 acks=all, retries > 1,
  • max.inflight.requests.per.connection=1 을 같이 사용해야 함
false
transaction.timeout.ms
  • Transaction Coordinator가 진행 중인 Transaction을 사전에 중단하기 전에 Producer Transaction 상태 업데이트를 기다리는 최대 시간(ms)
  • 이 구성 값은 InitPidRequest와 함께 Transaction Coordinator에게 전송
  • 이 값이 Broker max.transaction.timeout.ms 설정보다 크면
  • 'InvalidTransactionTimeout' 오류와 함께 요청이 실패
60000 (60 sec)
transactional.id
  • Transaction 전달에 사용할 TransactionalId
  • 이를 통해 클라이언트는 새로운 Transaction을 시작하기 전에 동일한 여러 Producer session에 걸쳐 있는 안정성 의미 체계를 사용할 수 있음
  • TransactionalId를 사용하는 Transaction이 완료되었음을 보장할 수 있으므로
  • TransactionalId가 비어있으면(default), Producer Idempotent Delivery
  • 만으로 제한
  • TransactionalId가 구성된 경우, 반드시 enable.idempotence를 활성화해야 함
없음

                                                      출처 :김현수  컨플루언트 Senior Solutions Engineer 강의

 

Consumer Configs

Parameter
설명
Default 
isolation.level
  • read_uncommitted: Offset 순서로 Commit된 메시지와 Commit되지 않은 메시지를 모두 사용
  • read_committed: Non-Transaction 메시지 또는 Commit Transaction 메시지만 Offset 순서로 사용
read_uncommitted
enable.auto.commit
 false : Consumer Offset에 대한 Auto Commit  Off
true

                                                      출처 :김현수  컨플루언트 Senior Solutions Engineer 강의

 

 Consumer가 중복해서 데이터 처리하는 것에 대해 보장하지 않으므로, Consumer의 중복처리는 따로 로직을 작성해야 함(Idempotent Consumer)

 예를 들어, 메시지를 성공적으로 사용한 후 Kafka Consumer를 이전 Offset으로 되감으면 해당 Offset에서 최신 Offset까지 모든 메시지를 다시 수신하게 됨

 

https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging

 

 

카프카 정확히 한 번 전송( exactly-once)

은행!!!! 출금2번, 출금x 입금 2번.

트랜잭션과 같은 전체적인 프로세스 처리를 의미

트랜잭션 코디네이터: 프로듀서에 의해 전송된 메시지를 관리. 커밋 또는 중단 등을 표시

컨슈머 오프셋 관리를 위해 오프셋 정보를 카프카의 내부 토픽에 저장하는데, 트랜잭션도 동일하게 트랜잭션 로그를 카프카의 내부 토픽인 _transaction_State에 저장. 트랜잭션 커밋 실패 관리하는 컨트롤 메시지가 추가로 사용됨.(payload에 애플리케이션 데이터를 포함하지 않음)

 

컨슈머는 isolation.level을  read_committed 로 변경해야한다

 

 

 

 

 

 

 

 

 

 

 

 

 

 

4)Consumer:Topic의 메시지를 가져와서 소비(Consume)하는 애플리케이션

레코드를 polling하는 애플리케이션(Broker에서 data를 read하는 역할)

 

 

consumer Offset commit : Consumer가 자동이나 수동으로 데이터를 읽은 위치를 commit하여 다시 읽음을 방지

Consumer Offset: Consumer Group이 읽은 위치를 표시

컨슈머가 읽었으면 컨슈머는 [자신의 그룹Id:토픽명:파티션번호 : 다음읽을 오프셋 번호]를

브로커에게 알려주고 브로커는 __consumer_offsets라는 InternalTopic에 ConsumerOffset을 저장하여 관리

 

spring kafka에서 Consumer는 Thread-safe하지 않으므로 Listener를 호출하는 쓰레드에서만 호출

 

-컨슈머 그룹 : Topic의 메시지를 사용하기 위해 협력하는 Consumer들의 집합

ConsumerGroup의 Consumer들은 작업량을 어느정도 균등하게 분할함

다른 ConsumerGroup의 Consumer들은 분리되어 독립적으로 작동
동일한 Topic에서 consume하는 여러 ConsumerGroup이 있을 수 있음(but 각기 무관하게 컨슘해감)

4개의 파티션이있는 Topic를 consume하는 4개의 Consumer가 하나의 Consumer Group에 있다면, 각 Consumer는 정확히 하나의 Partition에서 Record를 consume함

Partition은 항상ConsumerGroup내의 하나의 Consumer에 의해서만 사용됨 

반대로 Consumer는 주어진Topic에서 0개이상의 많은Partition을 사용할 수 있음

하나의 Consumer는 하나의 Consumer Group에 포함되며, Consumer Group내의 Consumer들은 협력하여 Topic의 메시지를 분산 병렬 처리함

컨슈머 그룹의 컨슈머의 개수<=가져갈 토픽의 파티션의 개수 (아니면 일부 컨슈머가 유휴상태)

 

컨슈머 그룹 리밸런싱 발생할 수 있는 경우
Rebalance occurs when a new consumer is added, removed or consumer dies or paritions increased.

 

https://dev-jj.tistory.com/entry/Kafka-같은메시지를-반복적으로-소비했던-리밸런싱-이슈-해결-MAX-POLL-RECORDS-CONFIG-maxpollrecord 

 

https://techblog.gccompany.co.kr/카프카-컨슈머-그룹-리밸런싱-kafka-consumer-group-rebalancing-5d3e3b916c9e

max.poll.records

poll()에 대한 단일 호출에서 반환되는 최대 레코드 수입니다. 컨슈머는 “max.poll.records” 설정의 개수만큼 메세지를 처리한 뒤 Poll 요청을 보내게 됩니다. 하지만, 메세지들의 처리 시간이 늦어져서 “max.poll.interval.ms” 설정 시간을 넘기게 된다면 컨슈머에 문제가 있다고 판단하여 리밸런싱이 일어납니다

 

heartbeat.interval.ms: 기본값은 3000(3초).  그룹 코디네이터와 하트비트 인터벌 시간임. 해당시간은session.timeout.ms보다 낮게 설정해야하며 session.timeout.ms의 1/3수준이 적당함.

 

session.timeout.ms: 기본값 10000(10초). 컨슈머는 주기적으로 브로커에게 하트비트를 보내야하고,만약 이 시간 전까지 하트비트를 보내지 않았다면 해당 컨슈머는 종료된 것으로 간주하고 컨슈머 그룹에서 제외하고 리밸런싱 시작함.

 

max.poll.interval.ms: 기본값 300000(5분).  컨슈머는 주기적으로 poll 호출해서 토픽에서 레코드 가져오는데 poll호출후 5분동한 poll 호출이 없다면 컨슈머가 문제가 있는것으로 판단해 리밸런싱함. 몬가 긴 작업처리로인해 poll을 못하는 상황이면 이 값을 늘려서 리밸런싱을 막던지 다른 방법을 조치해야한다.

 

컨슈머 다운 빠르게 감지: 리밸런싱 자주

컨슈머 다운 느리게 감지:  그 시간만큼 메시지 읽지 못하는 현상

 

group.instance.id컨슈머의 고유한 식별자. 설정한다면 static 멤버로 간주되며, 불필요한 리벨런싱 하지 않는다.

스태틱 멤버쉽: 컨슈머 그룹 내에서 컨슈머가 재시작 등으로 그룹에서 나갔다가 다시 합류하더라도 컨슈머ID를 적용해놓아서 리밸런싱이 일어나지 않음.



 

Rebalancing Trigger
• Consumer가 Consumer Group에서 탈퇴
• 신규 Consumer가 Consumer Group에 합류
• Consumer가 Topic 구독을 변경
• Consumer Group은 Topic 메타데이터의 변경 사항을 인지 (예: Partition 증가)

 

Rebalancing Process
1. Group Coordinator는 heartbeats의 플래그를 사용하여 Consumer에게 Rebalance 신호를 보냄
2. Consumer가 일시 중지하고 Offset을 Commit
3. Consumer는 Consumer Group의 새로운 ”Generation"에 다시 합류
4. Partition 재할당
5. Consumer는새Partition에서다시
Consume을 시작

 

Consumer Rebalancing시 Consumer들은 메시지를 Consume하지 못함 따라서, 불필요한 Rebalancing은 반드시 피해야 함

 

하나의 Consumer Group에서의 Partition 할당
Partition을 Consumer에게 Assign(할당) 할 때,

• 하나의 Partition은 지정된 Consumer Group내의 하나의 Consumer 만 사용
• 동일한 Key를 가진 메시지는 동일한 Consumer가 사용 (Partition 수를 변경하지 않는 한)
• Consumer의 설정 파라미터 중에서 partition.assignment.strategy 로 할당 방식 조정
• Consumer Group은 Group Coordinator라는 프로세스에 의해 관리됨


 Consumer Group Coordination
한개의 토픽에 대해 Group Coordinator(하나의 Broker) 와 Group Leader(하나의 Consumer)가 상호작용

1. 컨슈머가 브로커에 자신 등록& 브로커 __consumer_offsets 생성 & 이 파티션리더가   Group Coordinator로 선택
각 컨슈머가 group.id로 브로커에 자신을 등록 
브로커는 Consumer Group을 만들고 Consumer의 모든 Offset은 __consumer_offsets Topic의 하나의 Partition에 저장  
이 Partition의 Leader Broker는 Consumer Group의 Group Coordinator로 선택


2  컨슈머의 JoinGroup 요청 순서에 따라  Consumer 나열
Consumers의 JoinGroup 요청에 대해 Group Coordinator는 group.initial.rebalance.delay.ms(기본값 3초)를 대기
Consumer들이 Consume할 최대 Partition 수까지 JoinGroup 요청을 수신하는 순서대로 Consumer를 나열


3 Group Coordinator는 최초 JoinGroup 요청한 컨슈머를 Group Leader로 결정 및 Group Leader는 파티션 전략에 따라 컨슈머들에게 Partition 할당

4 Consumer 가 Partition 맵핑정보를 Group Coordinator에게 전송(이렇게 할당할께요- 결재)

5.  Group Coordinator는  각 Consumer에게 할당된 Partition 정보를 확정. 보냄

 

왜 Group Coordinator(a Broker)가 직접 Partition을 할당하지 않는가?
• Kafka의 한 가지 원칙은 가능한 한 많은 계산을 클라이언트에 수행하도록 하여, Broker의 부담을 줄이는 것

 

 

-파티셔닝 전략

• 프로듀서가 레코드를 어느 파티션에 넣어야할지 결정하는 파티셔너처럼 이와 유사하게 컨슈머도 대상 토픽의 어느 파티션으로부터 레코드를 읽어야할지 결정함. 컨슈머 그룹의 리더 컨슈머가 정해진 파티션 할당 전략에 따라 각 컨슈머와 대상 토픽의 파티션을 매칭시킴.
• Partition = Hash(Key) % Number of Partitions



https://kimmayer.tistory.com/entry/Kafka-메시지-순서-보장-확인해보기

•  하나의 토픽에 Partition이2개이상인 경우 토픽내 모든 메시지에 대한 전체 순서 보장 불가능
 하나의 토픽에 Partition을1개로 구성하면 모든 메시지에서 전체 순서 보장 가능‒처리량저하
 하나의 토픽에 Partition을1개로 구성해서 모든 메시지에서 전체순서 보장을 해야하는 경우가 얼마나 많을까?

대부분의 경우,Key로 구분할수 있는 메시지들의 순서 보장이 필요한 경우가 많음

 하나의 토픽에 동일한 Key를 가진 메시지는 동일한 Partition에만 전달되어 Key레벨의 순서 보장 가능 ‒ 멀티 Partition 사용 가능 = 처리량 증가
• 운영중에 Partition 개수를 변경하면 어떻게 될까? 순서보장불가

• 리밸런싱 발생시 ConsumerGroup내의 다른Consumer가 실패한Consumer를 대신하여 Partition에서 데이터를 가져와서 처리함

 

==================================

순서보장

 

 

0. 무조건 순서보장이 가장가장 중요하다면 파티션 1개로 하면된다

 

1. producer단

1)  partition Key를 둔다

<----그런데 이렇게 해도 partition 추가되면 무너진다.

2) enable.idempotence: 설정을 true로 하는 경우 중복 없는 전송이 가능하며(덤으로 프로듀서의 순서 보장도 됨), 이와 동시에 max.in.flight.requests.per.connection 은 5이하, retries는 1이상 , acks는 all로 설정해야함. 

 

2. consumer단

1) partition key를 둔다

<----그런데 이렇게 해도 partition 추가되면 무너진다.

 

3.  zero-payload

 

4. 타임스탬프도 같이 보내기~

==================================

 

 

Partition Assignment
하나의 Consumer Group에서의 Partition 할당

Partition을 Consumer에게 Assign(할당) 할 때,
• 하나의 Partition은 지정된 Consumer Group내의 하나의 Consumer 만 사용
• 동일한 Key를 가진 메시지는 동일한 Consumer가 사용 (Partition 수를 변경하지 않고 컨슈머 수가 변하지 않는 한)
• Consumer의 설정 파라미터 중에서 partition.assignment.strategy 로 할당 방식 조정
• Consumer Group은 Group Coordinator라는 프로세스에 의해 관리됨

 

 

Consumer의 설정 파라미터 중에서 partition.assignment.strategy 로 할당 방식 조정

 

The default assignor is [RangeAssignor, CooperativeStickyAssignor], which will use the RangeAssignor by default, but allows upgrading to the CooperativeStickyAssignor with just a single rolling bounce that removes the RangeAssignor from the list.

Implementing the org.apache.kafka.clients.consumer.ConsumerPartitionAssignor interface allows you to plug in a custom assignment strategy.

 

 

• org.apache.kafka.clients.consumer.RangeAssignor

Topic별로 작동하는 Default Assignor

레인지 파티션 할당 전략:  토픽별로 할당 전략을 사용함. 동일한 키를 이용하는 2개 이상의 토픽을 컨슘할 때 유리. 파티션 수/컨슈머 수 해서 불일치한거는 앞쪽의 컨슈머에게 먼저 추가 할당함

예) Delivery ID를 key로 가지고 있는 delivery_status 메시지와 delivery_location 메시지

 

• org.apache.kafka.clients.consumer.RoundRobinAssignor 

Round Robin 방식으로 Consumer에게 Partition을 할당

라운드 로빈 파티션 할당전략: 사용 가능한 파티션과 컨슈머들을 라운드 로빈으로 할당함. 균등한 분배 가능

컨슈머에 토픽 하나를 두고 파티션을 라운드로빈 할당한다. (토픽1 돌린 후 토픽2 ...)

 Reassign(재할당) 후 Consumer가 동일한 Partition을 유지한다고 보장하지 않음
 예) Consumer 0 이 지금 Topic0의 Partition0 에 할당되어 있지만, 재할당이 발생하면 Topic0의 Partition0 이 다른 Consumer에게 할당될 수 있음

 

• org.apache.kafka.clients.consumer.StickyAssignor
최대한 많은 기존 Partition 할당을 유지하면서 (나머지 부분만 재할당하며) 최대 균형을 이루는 할당을 보장

쉽게는 컨슈머들에게 할당된 파티션수들의 차이를 줄임

Range 방식 보다 Rebalancing 오버헤드를 줄임

스티키 파티션 할당 전략: 컨슈머가 컨슘하고 있는 파티션을 계속 유지할 수 있음


1) 가능한한 균형적으로 할당을 보장
Consumer들에게 할당된 Topic Partition의 수는 최대 1만큼 다름
특정 Consumer(예, Consumer A)가 다른 Consumer들(예, Consumer B)에 비해 2개 이상 더 적은
Topic Partition이 할당된 경우, A에 할당된 Topic의 나머지 Partition들은 B에 할당될 수 없음
2) 재할당이 발생했을 때, 기존 할당을 최대한 많이 보존하여 유지
Topic Partition이 하나의 Consumer에서 다른 Consumer로 이동할 때의 오버헤드를 줄임

 

org.apache.kafka.clients.consumer.CooperativeStickyAssignor 

동일한 StickyAssignor 논리를 따르지만 협력적인 Rebalance을 허용

협력적 스티키 파티션 할당 전략: 스티키 방식과 유사하지만, 전체 일시 정지가 아닌 연속적인 재조정 방식임

BasicCooperativeRebalancing프로토콜은ApacheKafka2.4에서도입
IncrementalCooperativeRebalancing프로토콜은ApacheKafka2.5에서추가
 빈번하게Rebalancing되는상황이거나스케일인/아웃으로인한다운타임이우려가된다면, 최신 버전의 Kafka(2.5 이상)기반으로 사용하는 것을 권장

https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy

 

• org.apache.kafka.clients.consumer.ConsumerPartitionAssignor 

인터페이스를 구현하면사용자지정할당전략을사용할수있음

 

 

 Cardinality

 특정 데이터 집합에서 유니크(Unique)한 값의 개수

• Key Cardinality는 Consumer Group의 개별 Consumer가 수행하는 작업의 양에 영향
• Key 선택이 잘못되면 작업 부하가 고르지않을수있음
• Key는 Integer, String 등과 같은 단순한 유형일 필요가 없음
• Key는 Value와 마찬가지로 Avro, JSON등 여러 필드가 있는 복잡한 객체일수있음
• 따라서, Partition 전체에 Record를 고르게 배포하는 Key를 만드는 것이 중요

 

 Consumer 관련 Position들
Last Committed Offset, Current Position, High Water Mark, Log End Offset
 Last Committed Offset(Current Offset) : Consumer가 최종 Commit한 Offset
• Current Position : Consumer가 읽어간 위치(처리 중, Commit 전)
• High Water Mark(Committed) : ISR(Leader-Follower)간에 복제된Offset.

(컨슈머는 ISR목록들이 Committed한것만 읽을 수 있음)
• Log End Offset : Producer가 메시지를 보내서 저장된, 로그의 맨 끝 Offset

 

컨슈머 랙 :Log End Offset-Last Committed Offset(Current Offset) 

 

컨슈머 Lag 컨슈머 랙

1.프로듀서가 마지막으로 넣은 offset - 컨슈머가 마지막으로 읽은 offset

Producer가 Write하는 LOG-END-OFFSET과 Consumer Group의 Consumer가 Read하고 처리한 후에 Commit한 CURRENT-OFFSET과의 차이

2. lag은 여러 개가 존재할 수있다

3. burrow -컨슈머 lag 모니터링

1) 멀티 카프카 클러스터 지원

2) Sliding window를 통한 consumer의 status 확인

3) http api 제공

 

 

-컨슈머 관련 옵션들

bootstrap.servers : 브로커의 호스트/포트 정보의 리스트 형태의 값

fetch.min.bytes: 한번에 가져올 수 있는 최소 데이터 크기

fetch.max.bytes: 한 번의 가져오기 요청으로 가져올 수 있는 최대 크기

max.partition.fetch.bytes: 파티션당 가져올 수 있는 최대 크기

max.poll.records: 한번의 poll() 요청으로 가져오는 최대 메시지 수

isolation.level: 트랜잭션 컨슈머에서 사용되는 옵션으로 read_uncommitted는 기본값으로 모든 메시지를 읽고, read_committed는 트랜잭션이 완료된 메시지만 읽는다.

gourp.id 컨슈머 그룹 아이디

enable.auto.commit:

true(default):일정간격 poll자동호출 및 commit

fasle: 오토 커밋을 사용하지 않는다(false할시 속도는 빠르나 데이터 중복,유실가능성 있음)

 

enable.auto.commit

If true the consumer's offset will be periodically committed in the background.

Type:Default:Valid Values:Importance:

 

Type: boolean
Default: true
Valid Values:
Importance: medium

 

 

Kafka consumer의 commitSync()-가장 느림,순서보장, commitAsync()-동기보다빠름.순서보장x 중복가능 

각 컨슈머 그룹은 서로에 영향을 미치지 않음

ex) 컨슈머그룹 A는 엘라스틱서치, 컨슈머 그룹B에는 하둡을 저장하게 두개 컨슈머그룹을 생성

 

카프카는 리더와 팔로워가 메시지를 잘 받았는지 확인하는 ACK 통신을 제거함. 속도 높임.

리더가 push 방식이 아닌 팔로워가 pull하는 방식임.

 

로그 컴팩션을 사용하고자 하면 카프카로 메시지를 전송할 때 키도 필숫값으로 전송해야함

로그 컴팩션의 장점은 빠른 장애 복구

 

auto.offset.reset : none,latest,eraliest

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

*earliest: automatically reset the offset to the earliest offset
*latest(default): automatically reset the offset to the latest offset
*none: throw exception to the consumer if no previous offset is found for the consumer's group
*anything else: throw exception to the consumer

 

if offsets are already committed for this consumer group and topic partition,  the property
auto.offset.reset is ignored

 

리밸런스 시작전 메서드에 커밋구현하여 중복방지해야함

컨슈머의 안전한 종료 graceful shutdown(동작 안하는 컨슈머로 인한 랙 방지)

wakeup()을 통한 graceful shutdown 필수!(WakeupException으로 받을 수 있음)
consumer.wakeup();

...

 

catch (WakeupException e) {
System.out.println("poll() method trigger WakeupException");
}finally { consumer.commitSync(); consumer.close();
}
SIGTERM을 통한 shutdown signal로 kill하여 처리한 데이터 커밋 필요 

SIGKILL(9)는 프로세스 강제 종료로 커밋 불가 ->중복/유실 발생

 

 

 Consumer Heartbeats

Consumer 장애를 인지하기 위함

1)heartbeat.interval.ms: 기본값은 3000(3초).  그룹 코디네이터와 하트비트 인터벌 시간임. 해당시간은session.timeout.ms보다 낮게 설정해야하며 session.timeout.ms의 1/3수준이 적당함.Consumer는 poll()과 별도로 백그라운드 Thread에서 Heartbeats를 보냄. 하트비트는 컨슈머의 세션이 활성상태인지 확인하고 새로운 컨슈머가 그룹에 가입하거나 탈퇴할 때 재조정을 용이하게 하는 데 사용됨.

The expected time between heartbeats to the consumer coordinator when using Kafka's group management facilities. Heartbeats are used to ensure that the consumer's session stays active and to facilitate rebalancing when new consumers join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

 

2)session.timeout.ms: 기본값 10000(10초). 컨슈머는 주기적으로 브로커에게 하트비트를 보내야하고,만약 이 시간 전까지 하트비트를 보내지 않았다면 해당 컨슈머는 종료된 것으로 간주하고 컨슈머 그룹에서 제외하고 리밸런싱 시작함.

The timeout used to detect client failures when using Kafka's group management facility. The client sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this client from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

 

3)max.poll.interval.ms: 기본값 300000(5분).  컨슈머는 주기적으로 poll 호출해서 토픽에서 레코드 가져오는데 poll호출후 5분동한 poll 호출이 없다면 컨슈머가 문제가 있는것으로 판단해 리밸런싱함. 몬가 긴 작업처리로인해 poll을 못하는 상황이면 이 값을 늘려서 리밸런싱을 막던지 다른 방법을 조치해야한다.

 

컨슈머 다운 빠르게 감지: 리밸런싱 자주

컨슈머 다운 느리게 감지:  그 시간만큼 메시지 읽지 못하는 현상

 

group.instance.id컨슈머의 고유한 식별자. 설정한다면 static 멤버로 간주되며, 불필요한 리벨런싱 하지 않는다.

스태틱 멤버쉽: 컨슈머 그룹 내에서 컨슈머가 재시작 등으로 그룹에서 나갔다가 다시 합류하더라도 컨슈머ID를 적용해놓아서 리밸런싱이 일어나지 않음.

 

  • group.instance.id is an identifier for a consumer, hence should be unique at consumer level
  • group.id is an identifier for a group, hence unique at group level

 

스태틱 멤버쉽 기능 적용시(group.instance.id) session.timeout.ms를 기본값보다 큰값으로 조정해야함.

 

 과도한 Rebalancing을 피하는 방법 성능 최적화에 필수

1. Consumer Group 멤버 고정

•  Group의 각 Consumer에게 고유한 group.instance.id 를 할당합니다.
• Consumer는 LeaveGroupRequest를 사용하지 않아야 함
• Rejoin(재가입)은 알려진 group.instance.id 에 대한 Rebalance를 trigger하지 않음.

 

 

2. session.timeout.ms 튜닝

•heartbeat.interval.ms를 session.timeout.ms의 1/3로 설정
• group.min.session.timeout.ms (Default: 6 seconds) 와 group.max.session.timeout.ms (Default: 5 minutes) 의 사이값
• 장점 : Consumer가 Rejoin(재가입)할 수있는 더 많은 시간을 제공
• 단점 : Consumer 장애를 감지하는 데 시간이 더 오래 걸림

 

3. max.poll.interval.ms 튜닝

• Consumer에게 poll()한 데이터를 처리할수있는 충분한시간제공
• 너무크게하면안됨

 

 

dead_letter

메시지 처리가 실패(메시지 deserialize할 수 없을때..등등)
정해진 횟수만큼 재시도했음에도 메시지 처리에 실패

https://daddyprogrammer.org/post/15087/kafka-consumer-retry-deadletter/
https://developer.confluent.io/learn-kafka/kafka-connect/error-handling-and-dead-letter-queues/

 

브로커에서 Not Leader For Partition Exception in the response하면

Send metadata request to the same broker for the topic and select the broker hosting the leader replica

 

at-most once consuming scenario 전략: Commit the offsets in Kafka, before processing the data(데이터 처리전 받았으면 일단 커밋!!)

 

To read data from a topic, the following configuration is needed for the consumers

->any broker to connect to, and the topic name

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 Page Cache 와 Flush
• 메시지는 Partition에 기록됨
• Partition은 Log Segment file로 구성 (기본값 : 1GB마다 새로운 Segment 생성)
• 성능을 위해 Log Segment는 OS Page Cache에 기록됨
• 로그 파일에 저장된 메시지의 데이터 형식은 Broker가 Producer로부터 수신한 것, 그리고 Consumer에게 보내는 것과 정확히 동일하므로, Zero-Copy)가 가능

•SSL 사용시 zero-copy 특징을 잃는다. 

 

producer --send-->Broker process --write-->OS Page Cache--flush-->Disk

*Broker process  에서는  Zero-copy 전송으로 인해 네트워크 버퍼에 있는 데이터를  브로커의 힙에 쌓거나 cpu 개입 없이 그대로 OS Page Cache에 전달


• Page Cache는 다음과 같은 경우 디스크로 Flush됨 

◦ Broker가 완전히 종료
◦ OS background “Flusher Thread” 실행

 

1) Zero-copy 전송은 데이터가, User Space에 복사되지 않고, CPU 개입 없이 Page Cache와 Network Buffer 사이에서 직접 전송되는 것을 의미. 이것을 통해 Broker Heap 메모리를 절약하고 또한 엄청난 처리량을 제공

 

 

 Flush 되기 전에 Broker 장애가 발생하면? 이를 대비하기 위해서 Replication 하는 것
• OS가 데이터를 디스크로 Flush하기 전에 Broker의 시스템에 장애가 발생하면 해당 데이터가 손실됨
• Partition이 Replication(복제)되어 있다면, Broker가 다시 온라인 상태가 되면 필요시 Leader Replica(복제본)에서 데이터가 복구됨
• Replication이 없다면, 데이터는 영구적으로 손실될 수 있음

 

 Kafka 자체 Flush 정책

•마지막 Flush 이후의 메시지 수(log.flush.interval.messages) 또는 시간(log.flush.interval.ms)으로 Flush(fsync)를 트리거하도록 설정할 수 있음
•Kafka는 운영 체제의 background Flush 기능(예: pdflush)을 더 효율적으로 허용하는 것을 선호하기 때문에 이러한 설정은 기본적으로 무한(기본적으로 fsync 비활성화)으로 설정
•이러한 설정을 기본값으로 유지하는 것을 권장
•*.log 파일을 보면 디스크로 Flush된 데이터와 아직 Flush되지 않은 Page Cache (OS
Buffer)에 있는 데이터가 모두 표시됨
•Flush된 항목과 Flush되지 않은 항목을 표시하는 Linux 도구(예: vmtouch)도 있음

 

 

 

 

미러메이커2: 서로 다른 두 개의 카프카 클러스터 간에 토픽을 복제하는 애플리케이션

커넥터 기반의 미러 메이커 2.0

엔터프라이즈 환경에서 카프카 클러스터를 한개만 사용하는 경우보단 여러개의 다중 클러스터를 활용하는 경우가 더 흔하다.

다중클러스터를 활용하는 경우들

가) 장애 복구 차원에서 다중 데이터 센터 운영하는 경우 (카프카간 리플리케이션)

나)온프레미스 데이터를 클라우드로 마이그레이션하는경우

다)데이터 분석 용도로 카프카와 카프카 간의 리플리케이션함.

프로듀서는 다운스트림 카프카와 가까운 위치에 배치하는것이 중요하다.

 

 

 

5.카프카 운영과 모니터링

 

주키퍼는 쿼럼(과반수) 구성을 기반으로 동작하므로 반드시 홀수로 구성해야한다. 

3대일시 최대 1대까지 장애 허용. 5대일시 최대 2대까지 장애를 허용함. 운영에서 5대가 적합하다.

 

JMX는 자바로 만든 애플리케이션의 모니터링을 위한 도구를 제공하는 자바 API로서 MBean 이라는 객체로 표현된다.

 

프로메테우스

프로메테우스는 메트릭 기반 모니터링 시스템. 강력한 데이터 모델과 쿼리 언어를 이용해 관리자가 원하는 형태의 메트릭을 손쉽게 표현할 수 있는 도구임.

프로메테우스를 활용하면 관리자는 애플리케이션의 성능과 상태, 그리고 인프라의 성능도 손쉽게 확인가능.

모니터링 방식이 push가아닌 pull 방식이라 대상 서버에 자신의 메트릭 정보를 보여줄 수 있는 익스포터를 설치해야함.

 

프로메테우스로 모니터링할 대상 서버와 포트 정보를 프로메테우스 환경 설정 파일에 등록하면, 프로메테우스는 주기적으로 대상 서버의 메트릭값을 가져와 자신의 데이터베이스에 저장함.

 

익스포터: 다양한 애플리케이션에서 수집되는 메트릭들을 프로메테우스가 인식할 수 있는 현태로 나타내는 에이전트.

 

그라파나: 전체 시스템에 관한 대시보드를 보여주거나 사용자가 손쉽게 대시보드를 만들 수 있도록 도와주는 도구

 

노드 익스포터: 서버에서 제공하는 CPU, 메모리,디스크, 네트워크 등의 리소스 사용량을 수집하는 역할. 브로커 서버의 하드웨어 리소스 모니터링

 

카프카 익스포터: 컨슈머 LAG 모니터링

 

 

6.카프카 버전 업그레이드와 확장

 

신규 브로커 추가된 이후 생성되는 토픽들은 신규 브로커를 비롯해 다른 브로커에 골고루 파티션을 분산배치하지만 기존 브로커들의 기존 파티션들이 분산배치 되지는 않는다.

 

브로커 간의 부하 분산 및 밸런스 맟추려면 관리자는 기존 파티션들이 모든 브로커에 고르게 분산되도록 수동으로 분산 작업을 진행해야 한다.

결국 리플리케이션이라 사용량 적은 시간에 하고, 토픽 1개씩 하라.

 

 

 

7.카프카를 saas형태로 사용하는 방법

 

1)  aws의 MSK(managed streaming for Kafka) : 완전 관리형 카프카 서비스로 간단한 콘솔 셋팅을 통해 aws에서 카프카 설치 실행을 자동화하여 도와준다.다양한 모니터링 도구 제공

장점: AWS 쉽게 연동 가능. Clould watch에서도 클라이언트,브로커 지표 모니터링 가능.  프로메테우스 플랫폼과 연동할 수 있는 JMX 익스포터, 노드 익스포터로제공

Ec2 인스턴스<--카프카 브로커(JMX포트)-->JMX 익스포터 <--pull --프로메테우스

Ec2 인스턴스->노드 익스포터 <--pull-- 프로메테우스

 

단점: 비용, 커스터마이징 불가

 

https://makerdark98.dev/wiki/msk.html

https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html

https://docs.aws.amazon.com/msk/latest/developerguide/create-client-machine.html

https://docs.aws.amazon.com/msk/latest/developerguide/create-topic.html

https://docs.aws.amazon.com/msk/latest/developerguide/produce-consume.html

 

2) confluent의 cloud Kafka

 

 

 

 

 

메시지 브로커 VS 이벤트 브로커

 

1.메시지 브로커
레디스 큐, 레빗 엠큐

메시지 보내고 처리 후 즉시 또는 짧은 시간내에 삭제되는 구조


2.이벤트 브로커
카프카, aws 키네시스

메시지 삭제 안함
1)단일진실공급원
2)장애발생시 장애 발생 시점부터 재처리 가능
3)많은 양의 실시간 스트림데이터 효과적 처리 가능


메시지 브로커는 이벤트 브로커로 역할 할 수 없지만 이벤트 브로커는 메세지 브로커로의 역할을 할 수 있다.

 

https://velog.io/@cho876/%EC%B9%B4%ED%94%84%EC%B9%B4kafka-vs-RabbitMQ

https://coding-nyan.tistory.com/129

https://tech.kakao.com/2021/12/23/kafka-rabbitmq/

https://ngio.co.kr/5017

 

 

 

Rewind Consumer Offsets

https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b
http://blog.sysco.no/integration/kafka-rewind-consumers-offset/

 

 

 Availability 와 Durability 가용성과 내구성 중 선택?
Topic 파라미터 - unclean.leader.election.enable
ISR 리스트에 없는 Replica를 Leader로 선출할 것인지에 대한 옵션 (default : false)
ISR 리스트에 Replica가 하나도 없으면 Leader 선출을 안 함 ‒ 서비스 중단 

ISR 리스트에 없는 Replica를 Leader로 선출함 ‒ 데이터 유실

 

Topic 파라미터 ‒ min.insync.replicas
최소 요구되는 ISR의 개수에 대한 옵션 (default : 1)
ISR 이 min.insync.replicas 보다 적은 경우, Producer는 NotEnoughReplicas 예외를 수신
Producer에서 acks=all과 함께 사용할 때 더 강력한 보장 + min.insync.replicas=2 

n개의 Replica가 있고, min.insync.replicas=2 인 경우 n-2개의 장애를 허용할 수 있음

 

 

Durability 데이터 유실이 없게 하려면?
Topic : replication.factor 는 2 보다 커야 함(최소 3이상)
Producer : acks 는 all 이어야 함
Topic : min.insync.replicas 는 1 보다 커야 함(최소 2 이상)


Availability 데이터 유실이 다소 있더라도 가용성을 높게 하려면?
Topic : unclean.leader.election.enable 를 true 로 설정

 

 

참고

김현수님(컨플루언트 senior Solutions Engineer)의 강의

고승범님의 '실전 카프카 개발부터 운영까지'

최원영님의 '아파치 카프카 애플리케이션 프로그래밍 with 자바'

 

https://www.conduktor.io/kafka

https://medium.com/@andy.bryant/processing-guarantees-in-kafka-12dd2e30be0e

https://stackoverflow.com/questions/58894281/difference-between-idempotence-and-exactly-once-in-kafka-stream

https://www.cloudkarafka.com/blog/apache-kafka-idempotent-producer-avoiding-message-duplication.html

 

https://docs.confluent.io/platform/current/installation/configuration/consumer-configs.html

https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html

https://blog.voidmainvoid.net/354

 

 

https://kafka.apache.org/documentation/#brokerconfigs

https://kafka.apache.org/documentation/#topicconfigs

https://kafka.apache.org/documentation/#producerconfigs

https://kafka.apache.org/documentation/#consumerconfigs

https://kafka.apache.org/documentation/#connectconfigs

https://kafka.apache.org/documentation/#sourceconnectconfigs

https://kafka.apache.org/documentation/#sinkconnectconfigs

https://kafka.apache.org/documentation/#streamsconfigs

https://kafka.apache.org/documentation/#adminclientconfigs

 

 

 

 

Comments