Backend/spring cloud (MSA)

[Kafka] κ³ κΈ‰ μ„€μ •

dddzr 2025. 2. 23. 19:17

πŸ“Œ Kafka 운영 ν™˜κ²½μ—μ„œ ν•„μš”ν•œ κ³ κΈ‰ μ„€μ •

κΈ°λ³Έ κ°œλ…λ§ŒμœΌλ‘œλ„ Kafkaλ₯Ό μ‚¬μš©ν•  μˆ˜λŠ” μžˆμ§€λ§Œ, μ‹€μ œ 운영 ν™˜κ²½μ—μ„œλŠ” 더 λ§Žμ€ κ±Έ μ•Œμ•„μ•Ό ν•œλ‹€!

πŸ”— κΈ°λ³Έ κ°œλ…(ν•„μˆ˜!)

  • ν† ν”½, νŒŒν‹°μ…˜, 브둜컀, 컨슈머 κ·Έλ£Ή, μ˜€ν”„μ…‹ 이해
  • Producer & Consumer λ™μž‘ 방식
  • 기본적인 λͺ…λ Ήμ–΄ (ν† ν”½ 생성, λ©”μ‹œμ§€ 생산/μ†ŒλΉ„, μ˜€ν”„μ…‹ 관리)

➑️ 이 μ •λ„λ§Œ μ•Œμ•„λ„ 둜컬 ν™˜κ²½μ—μ„œ ν…ŒμŠ€νŠΈν•˜κ³  κ°„λ‹¨ν•œ μ„œλΉ„μŠ€μ— μ μš©ν•  수 μžˆλ‹€!! πŸš€

 

πŸ“Œ 1. νŒŒν‹°μ…˜ ν• λ‹Ή μ „λž΅ (Partition Assignment Strategy)

 

βœ… 컨슈머 κ·Έλ£Ή μ΅œμ ν™”

KafkaλŠ” μ—¬λŸ¬ 개의 μ»¨μŠˆλ¨Έκ°€ λ™μΌν•œ 토픽을 μ†ŒλΉ„ν•  λ•Œ, νŒŒν‹°μ…˜μ„ μ»¨μŠˆλ¨Έλ“€μ—κ²Œ μžλ™μœΌλ‘œ ν• λ‹Ήν•œλ‹€. μ΄λ•Œ νŒŒν‹°μ…˜ ν• λ‹Ή μ „λž΅μ΄ μ€‘μš”ν•œ 역할을 ν•œλ‹€.

 βž‘️ μš΄μ˜ ν™˜κ²½μ—μ„œλŠ” 컨슈머 그룹의 λΆ€ν•˜ 뢄산을 μ΅œμ ν™”ν•˜κΈ° μœ„ν•΄ 상황에 λ§žλŠ” ν• λ‹Ή μ „λž΅μ„ 선택해야 ν•œλ‹€.

 

βœ… μ£Όμš” ν• λ‹Ή μ „λž΅

  • RangeAssignor (κΈ°λ³Έκ°’): νŒŒν‹°μ…˜μ„ 연속적인 λ²”μœ„λ‘œ λ‚˜λˆ„μ–΄ ν• λ‹Ή. 컨슈머 μˆ˜μ™€ νŒŒν‹°μ…˜ μˆ˜κ°€ κ· λ“±ν•˜μ§€ μ•ŠμœΌλ©΄ 일뢀 μ»¨μŠˆλ¨Έκ°€ 더 λ§Žμ€ νŒŒν‹°μ…˜μ„ κ°€μ Έκ°ˆ 수 있음.
  • RoundRobinAssignor: λͺ¨λ“  μ»¨μŠˆλ¨Έμ—κ²Œ **μˆœν™˜ 방식(λΌμš΄λ“œλ‘œλΉˆ)**으둜 골고루 ν• λ‹Ή. 컨슈머 κ°„ κ· λ“±ν•œ λΆ„λ°°κ°€ ν•„μš”ν•œ 경우 적합.
  • StickyAssignor: νŒŒν‹°μ…˜ μž¬ν• λ‹Ή μ‹œ κΈ°μ‘΄ 할당을 μ΅œλŒ€ν•œ μœ μ§€ν•˜μ—¬ λ¦¬λ°ΈλŸ°μ‹± μ˜€λ²„ν—€λ“œ μ΅œμ†Œν™”.
  • Custom Assignor: νŠΉμ • μš”κ΅¬μ‚¬ν•­μ— 맞게 직접 κ΅¬ν˜„ κ°€λŠ₯.

βœ… ν• λ‹Ή μ „λž΅ μ„€μ • 방법

partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

 

⭐ νŒŒν‹°μ…˜μ˜ 개수

  ➑️ νŒŒν‹°μ…˜ & 브둜컀 개수 관계

  • νŒŒν‹°μ…˜ κ°œμˆ˜λŠ” 브둜컀 κ°œμˆ˜μ™€ λ¬΄κ΄€ν•˜κ²Œ μ„€μ • κ°€λŠ₯!
    → ν•˜μ§€λ§Œ 졜적 μ„±λŠ₯을 μœ„ν•΄ 브둜컀 κ°œμˆ˜λ³΄λ‹€ ν¬κ±°λ‚˜ κ°™κ²Œ μ„€μ •ν•˜λŠ” 게 일반적.
  • 이유: 각 λΈŒλ‘œμ»€κ°€ μ΅œμ†Œν•œ ν•˜λ‚˜μ˜ νŒŒν‹°μ…˜μ„ κ°€μ Έμ•Ό λΆ€ν•˜κ°€ 뢄산됨!
    νŒŒν‹°μ…˜ κ°œμˆ˜κ°€ 브둜컀 κ°œμˆ˜λ³΄λ‹€ 적으면 일뢀 λΈŒλ‘œμ»€κ°€ λ†€κ²Œ 됨.

 

  ➑️ μΆ”μ²œ: νŒŒν‹°μ…˜ 개수 ≥ 브둜컀 개수 (보톡 브둜컀 개수 × 2 μΆ”μ²œ)

 

πŸ‘‰ νŒŒν‹°μ…˜ κ°œμˆ˜κ°€ 브둜컀 κ°œμˆ˜λ³΄λ‹€ 적으면

더보기

πŸ” λ‚¨λŠ” λΈŒλ‘œμ»€λŠ” μ–Έμ œ 역할을 ν• κΉŒ?

1️⃣ μž₯μ•  λ°œμƒ μ‹œ λŒ€κΈ° μ—­ν• 

  • 리더 λΈŒλ‘œμ»€κ°€ 죽으면 남은 λΈŒλ‘œμ»€κ°€ λ¦¬λ”λ‘œ 승격될 μˆ˜λ„ 있음
  • λ‚¨λŠ” 브둜컀둜 μƒˆλ‘œμš΄ νŒŒν‹°μ…˜μ΄ 배치될 μˆ˜λ„ 있음 (브둜컀 μΆ”κ°€ μ‹œ μžλ™ μ‘°μ •)

 

2️⃣ λ‹€λ₯Έ ν† ν”½μ˜ νŒŒν‹°μ…˜μ„ μ €μž₯ν•  μˆ˜λ„ 있음

  • 운영 ν™˜κ²½μ—μ„œλŠ” 보톡 μ—¬λŸ¬ 개의 토픽이 있음
  • νŠΉμ • ν† ν”½μ—λŠ” 역할이 없더라도 λ‹€λ₯Έ ν† ν”½μ˜ 리더 λ˜λŠ” νŒ”λ‘œμ›Œλ‘œ 쓰일 수 있음

 

πŸ“Œ 2. λ¦¬ν”Œλ¦¬μΌ€μ΄μ…˜ & μž₯μ•  λŒ€λΉ„ (Replication & Fault Tolerance)

βœ…  KafkaλŠ” 데이터 μœ μ‹€μ„ λ°©μ§€ν•˜κΈ° μœ„ν•΄ λ¦¬ν”Œλ¦¬μΌ€μ΄μ…˜(볡제) κΈ°λŠ₯을 μ œκ³΅ν•œλ‹€.

 βž‘️ 운영 ν™˜κ²½μ—μ„œλŠ” λ¦¬ν”Œλ¦¬μΌ€μ΄μ…˜μ„ 톡해 데이터 μœ μ‹€μ„ λ°©μ§€ν•˜κ³ , μž₯μ•  λ°œμƒ μ‹œ λΉ λ₯Έ 볡ꡬ가 κ°€λŠ₯ν•˜λ„λ‘ μ„€μ •ν•΄μ•Ό ν•œλ‹€

⭐ λ¦¬ν”Œλ¦¬μΌ€μ΄μ…˜μ˜ 개수

  ➑️ λ¦¬ν”Œλ¦¬μΌ€μ΄μ…˜ & 브둜컀 개수 관계

  • μ΅œλŒ€ 브둜컀 개수만큼 μ„€μ • κ°€λŠ₯. (이 λ•Œ λΈŒλ‘œμ»€κ°€ μž₯μ•  λ‚˜λ©΄, μƒˆλ‘­κ²Œ ν• λ‹Ήν•  곡간이 μ—†μ–΄ 볡ꡬ가 어렀움)
  • 보톡 브둜컀 개수 -1둜 μ„€μ • (1κ°œλŠ” 리더, λ‚˜λ¨Έμ§€λŠ” νŒ”λ‘œμ›Œ).
  • λͺ¨λ“  λ³΅μ œλ³Έμ€ μ„œλ‘œ λ‹€λ₯Έ λΈŒλ‘œμ»€μ— μ €μž₯돼야 ν•˜λ―€λ‘œ,
    replication-factorλŠ” 브둜컀 개수λ₯Ό μ΄ˆκ³Όν•  수 μ—†μŒ.

  ➑️  μΆ”μ²œ: replication-factor ≤ 브둜컀 개수 (보톡 브둜컀 개수 - 1 μΆ”μ²œ)

 

πŸ“– λ¦¬ν”Œλ¦¬μΌ€μ΄μ…˜μ˜ ν• λ‹Ή 예

ν† ν”½ A (브둜컀 4개, νŒŒν‹°μ…˜ 3개, 볡제 3개)

KafkaλŠ” μžλ™μœΌλ‘œ 브둜컀λ₯Ό μ΅œλŒ€ν•œ κ· λ“±ν•˜κ²Œ λΆ„λ°°

νŒŒν‹°μ…˜ 리더 브둜컀 νŒ”λ‘œμ›Œ 브둜컀 1 νŒ”λ‘œμ›Œ 브둜컀 2
P0 B1 B2 B3
P1 B2 B3 B4
P2 B3 B4 B1

βœ… ꡬ성 μš”μ†Œ

  • 리더 νŒŒν‹°μ…˜: ν΄λŸ¬μŠ€ν„°μ—μ„œ νŠΉμ • λΈŒλ‘œμ»€κ°€ ν•΄λ‹Ή νŒŒν‹°μ…˜μ˜ 리더 역할을 λ‹΄λ‹Ή.
  • νŒ”λ‘œμ›Œ νŒŒν‹°μ…˜: 리더λ₯Ό λ³΅μ œν•˜μ—¬ λ°±μ—… 역할을 μˆ˜ν–‰.
  • ISR (In-Sync Replicas): 리더와 λ™κΈ°ν™”λœ νŒ”λ‘œμ›Œ λͺ©λ‘. μž₯μ•  λ°œμƒ μ‹œ μƒˆλ‘œμš΄ 리더λ₯Ό μ„ μΆœν•˜λŠ” 데 μ‚¬μš©λ¨.

 

βœ… μž‘λ™ 방식

  • 각 νŒŒν‹°μ…˜μ€ ν•˜λ‚˜μ˜ 리더(Leader)와 ν•˜λ‚˜ μ΄μƒμ˜ νŒ”λ‘œμ›Œ(Follower)둜 ꡬ성됨.
  • ν”„λ‘œλ“€μ„œ(Producer)λŠ” 항상 리더 νŒŒν‹°μ…˜μœΌλ‘œ 데이터λ₯Ό 보냄.
  • νŒ”λ‘œμ›Œ νŒŒν‹°μ…˜λ“€μ€ λ¦¬λ”μ˜ 데이터λ₯Ό 동기화(볡제)ν•˜λ©΄μ„œ λŒ€κΈ°.

 

βœ… μž₯μ•  λ°œμƒ μ‹œ 볡ꡬ κ³Όμ • 

1️⃣ 리더 브둜컀 μž₯μ•  λ°œμƒ 🚨 → ISR λ‚΄μ—μ„œ μƒˆλ‘œμš΄ 리더 μžλ™ μ„ μΆœ.
2️⃣ νŒ”λ‘œμ›Œκ°€ 일정 μ‹œκ°„ 리더와 λ™κΈ°ν™”λ˜μ§€ μ•ŠμœΌλ©΄ ISRμ—μ„œ 제거됨.
3️⃣ 볡ꡬ된 λΈŒλ‘œμ»€λŠ” λ‹€μ‹œ ISR에 좔가됨.

 

βœ… μ„€μ • 방법

min.insync.replicas=2  # λ˜λŠ” ν† ν”½ 생성 μ‹œ replication-factor=2 # μ΅œμ†Œν•œ 2개의 볡제본이 μžˆμ–΄μ•Ό 컀밋 κ°€λŠ₯

unclean.leader.election.enable=false  # μ†μƒλœ 데이터가 λ¦¬λ”λ‘œ μ„ μΆœλ˜λŠ” 것을 방지

 

 

 

πŸ“Œ 3. λ ˆν…μ…˜ μ •μ±… & μ„±λŠ₯ νŠœλ‹ (Retention Policy & Performance Tuning)

βœ… KafkaλŠ” 였래된 데이터λ₯Ό μžλ™μœΌλ‘œ μ‚­μ œν•˜κ±°λ‚˜ μœ μ§€ν•˜λŠ” λ ˆν…μ…˜ 정책을 μ§€μ›ν•œλ‹€.

 βž‘️ 운영 ν™˜κ²½μ—μ„œλŠ” λ ˆν…μ…˜ 섀정을 톡해 μ €μž₯ 곡간을 κ΄€λ¦¬ν•˜κ³ , μ„±λŠ₯ νŠœλ‹μ„ 톡해 처리 속도λ₯Ό μ΅œμ ν™”ν•΄μ•Ό ν•œλ‹€.

 

βœ… λ ˆν…μ…˜ μ •μ±… μ„€μ • 방법

log.retention.ms=604800000  # 7일간 데이터 보관

log.retention.bytes=1073741824  # 1GB 초과 μ‹œ μ‚­μ œ

 

βœ… μ„±λŠ₯ νŠœλ‹ μš”μ†Œ

  • 배치 크기 μ‘°μ •: batch.size κ°’ 쑰절둜 ν”„λ‘œλ“€μ„œμ˜ μ„±λŠ₯ ν–₯상.
  • μ••μΆ• μ‚¬μš©: compression.type=lz4 μ„€μ •μœΌλ‘œ 데이터 μ „μ†‘λŸ‰ 절감.
  • λ©”μ‹œμ§€ 크기 μ œν•œ: message.max.bytes μ„€μ •μœΌλ‘œ κ°œλ³„ λ©”μ‹œμ§€ 크기 μ‘°μ •.

 

πŸ“Œ 4. Kafka Connect & Schema Registry (μ™ΈλΆ€ μ‹œμŠ€ν…œ 연동)

βœ… Kafka Connectλ₯Ό μ‚¬μš©ν•˜λ©΄ λ‹€λ₯Έ μ‹œμŠ€ν…œκ³Ό μ†μ‰½κ²Œ 데이터λ₯Ό 연동할 수 μžˆλ‹€.

 βž‘️ 운영 ν™˜κ²½μ—μ„œλŠ” Kafka Connectλ₯Ό ν™œμš©ν•˜μ—¬ λ‹€μ–‘ν•œ μ‹œμŠ€ν…œκ³Ό μ—°λ™ν•˜κ³ , Schema Registry둜 데이터 일관성을 μœ μ§€ν•΄μ•Ό ν•œλ‹€.

 

βœ… Kafka Connect의 μ—­ν• 

  • DB, ν΄λΌμš°λ“œ, NoSQL, 파일 μ‹œμŠ€ν…œ λ“± λ‹€μ–‘ν•œ μ†ŒμŠ€μ—μ„œ Kafka둜 데이터λ₯Ό κ°€μ Έμ˜€κ±°λ‚˜ 전솑 κ°€λŠ₯.
  • Sink Connector: Kafka → μ™ΈλΆ€ μ‹œμŠ€ν…œ (예: Elasticsearch, HDFS, RDBMS)
  • Source Connector: μ™ΈλΆ€ μ‹œμŠ€ν…œ → Kafka (예: MySQL, PostgreSQL, MongoDB)

 

βœ… Schema Registry ν™œμš©

  • Kafka λ©”μ‹œμ§€μ˜ JSON, Avro, Protobuf 같은 데이터 ν˜•μ‹μ„ 관리.
  • μŠ€ν‚€λ§ˆ λ³€κ²½ μ‹œ 데이터 무결성을 μœ μ§€ν•˜κ³ , 데이터 손싀을 방지.
  • REST APIλ₯Ό 톡해 μŠ€ν‚€λ§ˆλ₯Ό μ‘°νšŒν•˜κ³  관리 κ°€λŠ₯.

 

πŸ“Œ 5. Kafka Streams & KSQL (μ‹€μ‹œκ°„ 데이터 처리)

βœ… Kafka Streams: Kafka 데이터λ₯Ό μ‹€μ‹œκ°„μœΌλ‘œ μ²˜λ¦¬ν•˜λŠ” κ°•λ ₯ν•œ 슀트리밍 라이브러리.
βœ… KSQL: SQL 문법을 μ΄μš©ν•΄ μ‹€μ‹œκ°„ 데이터 처리λ₯Ό μ‰½κ²Œ κ΅¬ν˜„ν•  수 μžˆλ‹€.

 βž‘️ μš΄μ˜ ν™˜κ²½μ—μ„œλŠ” Kafka Streams λ˜λŠ” KSQL을 ν™œμš©ν•˜μ—¬ μ‹€μ‹œκ°„ 데이터 처리λ₯Ό μ΅œμ ν™”ν•  수 μžˆλ‹€.

 

βœ… Kafka Streams μ£Όμš” κ°œλ…

  • Stateful & Stateless μ—°μ‚° 지원
  • Windowing 처리 κ°€λŠ₯ (μ‹œκ°„ 기반 데이터 처리)
  • Exactly-once 처리 κ°€λŠ₯ (쀑볡 λ©”μ‹œμ§€ 방지)

 

βœ… Kafka Streams 예제

KStream<String, String> stream = builder.stream("input-topic");

stream.filter((key, value) -> value.contains("important"))

      .to("output-topic");

 

βœ… KSQL μ£Όμš” κ°œλ…

  • Kafka 토픽을 SQL처럼 μ‘°νšŒν•˜κ³  필터링 κ°€λŠ₯.
  • μ‹€μ‹œκ°„ 데이터 μŠ€νŠΈλ¦Όμ„ 쑰인, 집계, λ³€ν™˜ κ°€λŠ₯.

 

βœ… KSQL 예제

SELECT * FROM transactions WHERE amount > 1000;

 

πŸ“Œ 6. Kafka 운영 ν™˜κ²½μ—μ„œμ˜ μ˜€ν”„μ…‹ 컀밋 μ „λž΅

βœ…  Kafkaμ—μ„œ μ˜€ν”„μ…‹(Offset) 은 μ»¨μŠˆλ¨Έκ°€ νŠΉμ • ν† ν”½μ˜ νŒŒν‹°μ…˜μ—μ„œ 데이터λ₯Ό μ–΄λ””κΉŒμ§€ μ½μ—ˆλŠ”μ§€λ₯Ό λ‚˜νƒ€λ‚΄λŠ” λ²ˆν˜Έλ‹€.
 βž‘️ 운영 ν™˜κ²½μ—μ„œλŠ” 데이터 μœ μ‹€ 방지, 쀑볡 방지, μ •ν™•ν•œ ν•œ 번만 처리 (Exactly-Once) 보μž₯을 μœ„ν•΄ μ˜€ν”„μ…‹ 컀밋 μ „λž΅μ΄ μ€‘μš”ν•¨!

 

βœ… 1. μ˜€ν”„μ…‹ 컀밋 방식: μžλ™ vs μˆ˜λ™

μ˜€ν”„μ…‹μ„ μ–Έμ œ, μ–΄λ–»κ²Œ μ €μž₯할지 κ²°μ •ν•˜λŠ” μ„€μ •.

방식 μ„€λͺ… μž₯점 단점
μžλ™ 컀밋 (Auto Commit) Kafkaκ°€ auto.commit.interval.ms(κΈ°λ³Έ 5초)λ§ˆλ‹€ μžλ™μœΌλ‘œ μ˜€ν”„μ…‹ μ €μž₯ κ΅¬ν˜„μ΄ 간단 λ©”μ‹œμ§€κ°€ 처리되기 전에 컀밋될 μˆ˜λ„ 있음 (데이터 μœ μ‹€ κ°€λŠ₯)
μˆ˜λ™ 컀밋 (Manual Commit) μ»¨μŠˆλ¨Έκ°€ 직접 μ˜€ν”„μ…‹μ„ 컀밋 (commitSync(), commitAsync()) 데이터 μœ μ‹€ & 쀑볡 μ΅œμ†Œν™” κ°€λŠ₯ κ°œλ°œμžκ°€ 직접 관리해야 함

βœ” 운영 ν™˜κ²½μ—μ„œλŠ” μžλ™ 컀밋보닀 μˆ˜λ™ 컀밋을 μ“°λŠ” 게 μ•ˆμ „!
βœ” commitSync()λŠ” μ •ν™•ν•˜μ§€λ§Œ 느리고, commitAsync()λŠ” λΉ λ₯΄μ§€λ§Œ 일뢀 μ‹€νŒ¨ κ°€λŠ₯ → λ‘˜μ„ μ‘°ν•©ν•΄μ„œ μ‚¬μš© κ°€λŠ₯!

 

πŸ“– μžλ™ 컀밋 λΉ„ν™œμ„±ν™”

enable.auto.commit=false

 

πŸ“– 일반 Kafka 컨슈머 예제 (commitSync() μ‚¬μš©)

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        processRecord(record); // λ©”μ‹œμ§€ 처리
    }

    consumer.commitSync(); // 처리 μ™„λ£Œ ν›„ μ˜€ν”„μ…‹ 컀밋
}



⭐ @KafkaListener μ‚¬μš© μ‹œ μˆ˜λ™ 컀밋

πŸ“– Spring Kafka 예제 (ack.acknowledge() μ‚¬μš©)

@KafkaListener(topics = "my-topic", containerFactory = "myFactory")
public void listen(String message, Acknowledgment ack) {
    System.out.println("λ©”μ‹œμ§€ μˆ˜μ‹ : " + message);
    ack.acknowledge();  // λ©”μ‹œμ§€ 정상 처리 ν›„ μ˜€ν”„μ…‹ 컀밋
}

 

πŸ“– KafkaListenerContainerFactoryμ—μ„œ ackMode λ³€κ²½

μœ„ μ½”λ“œκ°€ μ œλŒ€λ‘œ λ™μž‘ν•˜λ €λ©΄, λ¦¬μŠ€λ„ˆ μ»¨ν…Œμ΄λ„ˆμ˜ ackModeλ₯Ό MANUAL둜 μ„€μ •ν•΄μ•Ό ν•œλ‹€.

@Configuration
public class KafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, DeliveryStartedEvent> kafkaListenerContainerFactory(
            ConsumerFactory<String, DeliveryStartedEvent> consumerFactory) {
    
        ConcurrentKafkaListenerContainerFactory<String, DeliveryStartedEvent> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // μˆ˜λ™ 컀밋 μ„€μ •

        return factory;
    }
}

 

 

βœ… 2. μ˜€ν”„μ…‹ μ΄ˆκΈ°ν™” (Reset) - λ©”μ‹œμ§€ μœ μ‹€ 방지

μ»¨μŠˆλ¨Έκ°€ 처음 μ‹œμž‘ν•˜κ±°λ‚˜, μ˜€ν”„μ…‹ 정보가 μ‚¬λΌμ‘Œμ„ λ•Œ Kafkaκ°€ μ–΄λ””μ„œλΆ€ν„° 읽을지 κ²°μ •ν•˜λŠ” μ„€μ •.

운영 ν™˜κ²½μ—μ„œλŠ” earliestλ₯Ό μ„€μ •ν•΄μ„œ 데이터 μœ μ‹€μ„ λ°©μ§€ν•˜λŠ” 게 μ’‹μŒ!

μ„€μ • κ°’ λ™μž‘ 방식
earliest κ°€μž₯ 였래된 λ©”μ‹œμ§€λΆ€ν„° 읽음 (데이터 μœ μ‹€ 방지)
latest (κΈ°λ³Έκ°’) κ°€μž₯ 졜근 λ©”μ‹œμ§€λΆ€ν„° 읽음 (이전 λ©”μ‹œμ§€λŠ” λ¬΄μ‹œλ¨)

πŸ“– μ˜€ν”„μ…‹ μ΄ˆκΈ°ν™” μ„€μ •

auto.offset.reset=earliest

 

 

βœ… 3. Exactly-Once 보μž₯ (쀑볡 방지 & νŠΈλžœμž­μ…˜ 처리)

Kafka κΈ°λ³Έ μ„€μ •μ—μ„œλŠ” at-least-once 방식이라 쀑볡 λ©”μ‹œμ§€ μ†ŒλΉ„κ°€ λ°œμƒν•  수 μžˆλ‹€.
이λ₯Ό λ°©μ§€ν•˜λ €λ©΄ Exactly-Once 처리λ₯Ό μœ„ν•œ 섀정을 μΆ”κ°€ν•΄μ•Ό 함!

  • idempotent producer (쀑볡 방지 ν”„λ‘œλ“€μ„œ) + transactional consumer (νŠΈλžœμž­μ…”λ„ 컨슈머) λ₯Ό μ‘°ν•©.
  • read-process-write νŒ¨ν„΄μ—μ„œ μ†ŒλΉ„ν•œ λ©”μ‹œμ§€μ™€ μƒˆλ‘œμš΄ λ©”μ‹œμ§€λ₯Ό ν•˜λ‚˜μ˜ νŠΈλžœμž­μ…˜μœΌλ‘œ λ¬Άμ–΄ 컀밋함.

πŸ“– ν”„λ‘œλ“€μ„œ μΈ‘ μ„€μ • (쀑볡 방지)

enable.idempotence=true
acks=all

 

πŸ“– μ»¨μŠˆλ¨Έ μΈ‘ μ„€μ • (νŠΈλžœμž­μ…˜ ν™œμ„±ν™”)

isolation.level=read_committed

 

πŸ“– νŠΈλžœμž­μ…˜al μ†ŒλΉ„μž + ν”„λ‘œλ“€μ„œ μ‚¬μš©

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transaction-id");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    producer.beginTransaction();
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
        producer.send(new ProducerRecord<>("new-topic", record.value()));
    }
   
    producer.commitTransaction(); // ν”„λ‘œλ“€μ„œ νŠΈλžœμž­μ…˜ 컀밋
    consumer.commitSync(); // 컨슈머 μ˜€ν”„μ…‹ 컀밋
}