Backend/spring cloud (MSA)

[Kafka] ๊ณ ๊ธ‰ ์„ค์ •

dddzr 2025. 2. 23. 19:17

๐Ÿ“Œ Kafka ์šด์˜ ํ™˜๊ฒฝ์—์„œ ํ•„์š”ํ•œ ๊ณ ๊ธ‰ ์„ค์ •

๊ธฐ๋ณธ ๊ฐœ๋…๋งŒ์œผ๋กœ๋„ Kafka๋ฅผ ์‚ฌ์šฉํ•  ์ˆ˜๋Š” ์žˆ์ง€๋งŒ, ์‹ค์ œ ์šด์˜ ํ™˜๊ฒฝ์—์„œ๋Š” ๋” ๋งŽ์€ ๊ฑธ ์•Œ์•„์•ผ ํ•œ๋‹ค!

๐Ÿ”— ๊ธฐ๋ณธ ๊ฐœ๋…(ํ•„์ˆ˜!)

  • ํ† ํ”ฝ, ํŒŒํ‹ฐ์…˜, ๋ธŒ๋กœ์ปค, ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน, ์˜คํ”„์…‹ ์ดํ•ด
  • Producer & Consumer ๋™์ž‘ ๋ฐฉ์‹
  • ๊ธฐ๋ณธ์ ์ธ ๋ช…๋ น์–ด (ํ† ํ”ฝ ์ƒ์„ฑ, ๋ฉ”์‹œ์ง€ ์ƒ์‚ฐ/์†Œ๋น„, ์˜คํ”„์…‹ ๊ด€๋ฆฌ)

โžก๏ธ ์ด ์ •๋„๋งŒ ์•Œ์•„๋„ ๋กœ์ปฌ ํ™˜๊ฒฝ์—์„œ ํ…Œ์ŠคํŠธํ•˜๊ณ  ๊ฐ„๋‹จํ•œ ์„œ๋น„์Šค์— ์ ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค!! ๐Ÿš€

 

๐Ÿ“Œ 1. ํŒŒํ‹ฐ์…˜ ํ• ๋‹น ์ „๋žต (Partition Assignment Strategy)

 

โœ… ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน ์ตœ์ ํ™”

Kafka๋Š” ์—ฌ๋Ÿฌ ๊ฐœ์˜ ์ปจ์Šˆ๋จธ๊ฐ€ ๋™์ผํ•œ ํ† ํ”ฝ์„ ์†Œ๋น„ํ•  ๋•Œ, ํŒŒํ‹ฐ์…˜์„ ์ปจ์Šˆ๋จธ๋“ค์—๊ฒŒ ์ž๋™์œผ๋กœ ํ• ๋‹นํ•œ๋‹ค. ์ด๋•Œ ํŒŒํ‹ฐ์…˜ ํ• ๋‹น ์ „๋žต์ด ์ค‘์š”ํ•œ ์—ญํ• ์„ ํ•œ๋‹ค.

 โžก๏ธ ์šด์˜ ํ™˜๊ฒฝ์—์„œ๋Š” ์ปจ์Šˆ๋จธ ๊ทธ๋ฃน์˜ ๋ถ€ํ•˜ ๋ถ„์‚ฐ์„ ์ตœ์ ํ™”ํ•˜๊ธฐ ์œ„ํ•ด ์ƒํ™ฉ์— ๋งž๋Š” ํ• ๋‹น ์ „๋žต์„ ์„ ํƒํ•ด์•ผ ํ•œ๋‹ค.

 

โœ… ์ฃผ์š” ํ• ๋‹น ์ „๋žต

  • RangeAssignor (๊ธฐ๋ณธ๊ฐ’): ํŒŒํ‹ฐ์…˜์„ ์—ฐ์†์ ์ธ ๋ฒ”์œ„๋กœ ๋‚˜๋ˆ„์–ด ํ• ๋‹น. ์ปจ์Šˆ๋จธ ์ˆ˜์™€ ํŒŒํ‹ฐ์…˜ ์ˆ˜๊ฐ€ ๊ท ๋“ฑํ•˜์ง€ ์•Š์œผ๋ฉด ์ผ๋ถ€ ์ปจ์Šˆ๋จธ๊ฐ€ ๋” ๋งŽ์€ ํŒŒํ‹ฐ์…˜์„ ๊ฐ€์ ธ๊ฐˆ ์ˆ˜ ์žˆ์Œ.
  • RoundRobinAssignor: ๋ชจ๋“  ์ปจ์Šˆ๋จธ์—๊ฒŒ **์ˆœํ™˜ ๋ฐฉ์‹(๋ผ์šด๋“œ๋กœ๋นˆ)**์œผ๋กœ ๊ณจ๊ณ ๋ฃจ ํ• ๋‹น. ์ปจ์Šˆ๋จธ ๊ฐ„ ๊ท ๋“ฑํ•œ ๋ถ„๋ฐฐ๊ฐ€ ํ•„์š”ํ•œ ๊ฒฝ์šฐ ์ ํ•ฉ.
  • StickyAssignor: ํŒŒํ‹ฐ์…˜ ์žฌํ• ๋‹น ์‹œ ๊ธฐ์กด ํ• ๋‹น์„ ์ตœ๋Œ€ํ•œ ์œ ์ง€ํ•˜์—ฌ ๋ฆฌ๋ฐธ๋Ÿฐ์‹ฑ ์˜ค๋ฒ„ํ—ค๋“œ ์ตœ์†Œํ™”.
  • Custom Assignor: ํŠน์ • ์š”๊ตฌ์‚ฌํ•ญ์— ๋งž๊ฒŒ ์ง์ ‘ ๊ตฌํ˜„ ๊ฐ€๋Šฅ.

โœ… ํ• ๋‹น ์ „๋žต ์„ค์ • ๋ฐฉ๋ฒ•

partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor

 

โญ ํŒŒํ‹ฐ์…˜์˜ ๊ฐœ์ˆ˜

  โžก๏ธ ํŒŒํ‹ฐ์…˜ & ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜ ๊ด€๊ณ„

  • ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜๋Š” ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜์™€ ๋ฌด๊ด€ํ•˜๊ฒŒ ์„ค์ • ๊ฐ€๋Šฅ!
    → ํ•˜์ง€๋งŒ ์ตœ์  ์„ฑ๋Šฅ์„ ์œ„ํ•ด ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜๋ณด๋‹ค ํฌ๊ฑฐ๋‚˜ ๊ฐ™๊ฒŒ ์„ค์ •ํ•˜๋Š” ๊ฒŒ ์ผ๋ฐ˜์ .
  • ์ด์œ : ๊ฐ ๋ธŒ๋กœ์ปค๊ฐ€ ์ตœ์†Œํ•œ ํ•˜๋‚˜์˜ ํŒŒํ‹ฐ์…˜์„ ๊ฐ€์ ธ์•ผ ๋ถ€ํ•˜๊ฐ€ ๋ถ„์‚ฐ๋จ!
    ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜๊ฐ€ ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜๋ณด๋‹ค ์ ์œผ๋ฉด ์ผ๋ถ€ ๋ธŒ๋กœ์ปค๊ฐ€ ๋†€๊ฒŒ ๋จ.

 

  โžก๏ธ ์ถ”์ฒœ: ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜ ≥ ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜ (๋ณดํ†ต ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜ × 2 ์ถ”์ฒœ)

 

๐Ÿ‘‰ ํŒŒํ‹ฐ์…˜ ๊ฐœ์ˆ˜๊ฐ€ ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜๋ณด๋‹ค ์ ์œผ๋ฉด

๋”๋ณด๊ธฐ

๐Ÿ” ๋‚จ๋Š” ๋ธŒ๋กœ์ปค๋Š” ์–ธ์ œ ์—ญํ• ์„ ํ• ๊นŒ?

1๏ธโƒฃ ์žฅ์•  ๋ฐœ์ƒ ์‹œ ๋Œ€๊ธฐ ์—ญํ• 

  • ๋ฆฌ๋” ๋ธŒ๋กœ์ปค๊ฐ€ ์ฃฝ์œผ๋ฉด ๋‚จ์€ ๋ธŒ๋กœ์ปค๊ฐ€ ๋ฆฌ๋”๋กœ ์Šน๊ฒฉ๋  ์ˆ˜๋„ ์žˆ์Œ
  • ๋‚จ๋Š” ๋ธŒ๋กœ์ปค๋กœ ์ƒˆ๋กœ์šด ํŒŒํ‹ฐ์…˜์ด ๋ฐฐ์น˜๋  ์ˆ˜๋„ ์žˆ์Œ (๋ธŒ๋กœ์ปค ์ถ”๊ฐ€ ์‹œ ์ž๋™ ์กฐ์ •)

 

2๏ธโƒฃ ๋‹ค๋ฅธ ํ† ํ”ฝ์˜ ํŒŒํ‹ฐ์…˜์„ ์ €์žฅํ•  ์ˆ˜๋„ ์žˆ์Œ

  • ์šด์˜ ํ™˜๊ฒฝ์—์„œ๋Š” ๋ณดํ†ต ์—ฌ๋Ÿฌ ๊ฐœ์˜ ํ† ํ”ฝ์ด ์žˆ์Œ
  • ํŠน์ • ํ† ํ”ฝ์—๋Š” ์—ญํ• ์ด ์—†๋”๋ผ๋„ ๋‹ค๋ฅธ ํ† ํ”ฝ์˜ ๋ฆฌ๋” ๋˜๋Š” ํŒ”๋กœ์›Œ๋กœ ์“ฐ์ผ ์ˆ˜ ์žˆ์Œ

 

๐Ÿ“Œ 2. ๋ฆฌํ”Œ๋ฆฌ์ผ€์ด์…˜ & ์žฅ์•  ๋Œ€๋น„ (Replication & Fault Tolerance)

โœ…  Kafka๋Š” ๋ฐ์ดํ„ฐ ์œ ์‹ค์„ ๋ฐฉ์ง€ํ•˜๊ธฐ ์œ„ํ•ด ๋ฆฌํ”Œ๋ฆฌ์ผ€์ด์…˜(๋ณต์ œ) ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•œ๋‹ค.

 โžก๏ธ ์šด์˜ ํ™˜๊ฒฝ์—์„œ๋Š” ๋ฆฌํ”Œ๋ฆฌ์ผ€์ด์…˜์„ ํ†ตํ•ด ๋ฐ์ดํ„ฐ ์œ ์‹ค์„ ๋ฐฉ์ง€ํ•˜๊ณ , ์žฅ์•  ๋ฐœ์ƒ ์‹œ ๋น ๋ฅธ ๋ณต๊ตฌ๊ฐ€ ๊ฐ€๋Šฅํ•˜๋„๋ก ์„ค์ •ํ•ด์•ผ ํ•œ๋‹ค

โญ ๋ฆฌํ”Œ๋ฆฌ์ผ€์ด์…˜์˜ ๊ฐœ์ˆ˜

  โžก๏ธ ๋ฆฌํ”Œ๋ฆฌ์ผ€์ด์…˜ & ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜ ๊ด€๊ณ„

  • ์ตœ๋Œ€ ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜๋งŒํผ ์„ค์ • ๊ฐ€๋Šฅ. (์ด ๋•Œ ๋ธŒ๋กœ์ปค๊ฐ€ ์žฅ์•  ๋‚˜๋ฉด, ์ƒˆ๋กญ๊ฒŒ ํ• ๋‹นํ•  ๊ณต๊ฐ„์ด ์—†์–ด ๋ณต๊ตฌ๊ฐ€ ์–ด๋ ค์›€)
  • ๋ณดํ†ต ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜ -1๋กœ ์„ค์ • (1๊ฐœ๋Š” ๋ฆฌ๋”, ๋‚˜๋จธ์ง€๋Š” ํŒ”๋กœ์›Œ).
  • ๋ชจ๋“  ๋ณต์ œ๋ณธ์€ ์„œ๋กœ ๋‹ค๋ฅธ ๋ธŒ๋กœ์ปค์— ์ €์žฅ๋ผ์•ผ ํ•˜๋ฏ€๋กœ,
    replication-factor๋Š” ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜๋ฅผ ์ดˆ๊ณผํ•  ์ˆ˜ ์—†์Œ.

  โžก๏ธ  ์ถ”์ฒœ: replication-factor ≤ ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜ (๋ณดํ†ต ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜ - 1 ์ถ”์ฒœ)

 

๐Ÿ“– ๋ฆฌํ”Œ๋ฆฌ์ผ€์ด์…˜์˜ ํ• ๋‹น ์˜ˆ

ํ† ํ”ฝ A (๋ธŒ๋กœ์ปค 4๊ฐœ, ํŒŒํ‹ฐ์…˜ 3๊ฐœ, ๋ณต์ œ 3๊ฐœ)

Kafka๋Š” ์ž๋™์œผ๋กœ ๋ธŒ๋กœ์ปค๋ฅผ ์ตœ๋Œ€ํ•œ ๊ท ๋“ฑํ•˜๊ฒŒ ๋ถ„๋ฐฐ

ํŒŒํ‹ฐ์…˜ ๋ฆฌ๋” ๋ธŒ๋กœ์ปค ํŒ”๋กœ์›Œ ๋ธŒ๋กœ์ปค 1 ํŒ”๋กœ์›Œ ๋ธŒ๋กœ์ปค 2
P0 B1 B2 B3
P1 B2 B3 B4
P2 B3 B4 B1

โœ… ๊ตฌ์„ฑ ์š”์†Œ

  • ๋ฆฌ๋” ํŒŒํ‹ฐ์…˜: ํด๋Ÿฌ์Šคํ„ฐ์—์„œ ํŠน์ • ๋ธŒ๋กœ์ปค๊ฐ€ ํ•ด๋‹น ํŒŒํ‹ฐ์…˜์˜ ๋ฆฌ๋” ์—ญํ• ์„ ๋‹ด๋‹น.
  • ํŒ”๋กœ์›Œ ํŒŒํ‹ฐ์…˜: ๋ฆฌ๋”๋ฅผ ๋ณต์ œํ•˜์—ฌ ๋ฐฑ์—… ์—ญํ• ์„ ์ˆ˜ํ–‰.
  • ISR (In-Sync Replicas): ๋ฆฌ๋”์™€ ๋™๊ธฐํ™”๋œ ํŒ”๋กœ์›Œ ๋ชฉ๋ก. ์žฅ์•  ๋ฐœ์ƒ ์‹œ ์ƒˆ๋กœ์šด ๋ฆฌ๋”๋ฅผ ์„ ์ถœํ•˜๋Š” ๋ฐ ์‚ฌ์šฉ๋จ.

 

โœ… ์ž‘๋™ ๋ฐฉ์‹

  • ๊ฐ ํŒŒํ‹ฐ์…˜์€ ํ•˜๋‚˜์˜ ๋ฆฌ๋”(Leader)์™€ ํ•˜๋‚˜ ์ด์ƒ์˜ ํŒ”๋กœ์›Œ(Follower)๋กœ ๊ตฌ์„ฑ๋จ.
  • ํ”„๋กœ๋“€์„œ(Producer)๋Š” ํ•ญ์ƒ ๋ฆฌ๋” ํŒŒํ‹ฐ์…˜์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๋ƒ„.
  • ํŒ”๋กœ์›Œ ํŒŒํ‹ฐ์…˜๋“ค์€ ๋ฆฌ๋”์˜ ๋ฐ์ดํ„ฐ๋ฅผ ๋™๊ธฐํ™”(๋ณต์ œ)ํ•˜๋ฉด์„œ ๋Œ€๊ธฐ.

 

โœ… ์žฅ์•  ๋ฐœ์ƒ ์‹œ ๋ณต๊ตฌ ๊ณผ์ • 

1๏ธโƒฃ ๋ฆฌ๋” ๋ธŒ๋กœ์ปค ์žฅ์•  ๋ฐœ์ƒ ๐Ÿšจ → ISR ๋‚ด์—์„œ ์ƒˆ๋กœ์šด ๋ฆฌ๋” ์ž๋™ ์„ ์ถœ.
2๏ธโƒฃ ํŒ”๋กœ์›Œ๊ฐ€ ์ผ์ • ์‹œ๊ฐ„ ๋ฆฌ๋”์™€ ๋™๊ธฐํ™”๋˜์ง€ ์•Š์œผ๋ฉด ISR์—์„œ ์ œ๊ฑฐ๋จ.
3๏ธโƒฃ ๋ณต๊ตฌ๋œ ๋ธŒ๋กœ์ปค๋Š” ๋‹ค์‹œ ISR์— ์ถ”๊ฐ€๋จ.

 

โœ… ์„ค์ • ๋ฐฉ๋ฒ•

min.insync.replicas=2  # ๋˜๋Š” ํ† ํ”ฝ ์ƒ์„ฑ ์‹œ replication-factor=2 # ์ตœ์†Œํ•œ 2๊ฐœ์˜ ๋ณต์ œ๋ณธ์ด ์žˆ์–ด์•ผ ์ปค๋ฐ‹ ๊ฐ€๋Šฅ

unclean.leader.election.enable=false  # ์†์ƒ๋œ ๋ฐ์ดํ„ฐ๊ฐ€ ๋ฆฌ๋”๋กœ ์„ ์ถœ๋˜๋Š” ๊ฒƒ์„ ๋ฐฉ์ง€

 

 

 

๐Ÿ“Œ 3. ๋ ˆํ…์…˜ ์ •์ฑ… & ์„ฑ๋Šฅ ํŠœ๋‹ (Retention Policy & Performance Tuning)

โœ… Kafka๋Š” ์˜ค๋ž˜๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์ž๋™์œผ๋กœ ์‚ญ์ œํ•˜๊ฑฐ๋‚˜ ์œ ์ง€ํ•˜๋Š” ๋ ˆํ…์…˜ ์ •์ฑ…์„ ์ง€์›ํ•œ๋‹ค.

 โžก๏ธ ์šด์˜ ํ™˜๊ฒฝ์—์„œ๋Š” ๋ ˆํ…์…˜ ์„ค์ •์„ ํ†ตํ•ด ์ €์žฅ ๊ณต๊ฐ„์„ ๊ด€๋ฆฌํ•˜๊ณ , ์„ฑ๋Šฅ ํŠœ๋‹์„ ํ†ตํ•ด ์ฒ˜๋ฆฌ ์†๋„๋ฅผ ์ตœ์ ํ™”ํ•ด์•ผ ํ•œ๋‹ค.

 

โœ… ๋ ˆํ…์…˜ ์ •์ฑ… ์„ค์ • ๋ฐฉ๋ฒ•

log.retention.ms=604800000  # 7์ผ๊ฐ„ ๋ฐ์ดํ„ฐ ๋ณด๊ด€

log.retention.bytes=1073741824  # 1GB ์ดˆ๊ณผ ์‹œ ์‚ญ์ œ

 

โœ… ์„ฑ๋Šฅ ํŠœ๋‹ ์š”์†Œ

  • ๋ฐฐ์น˜ ํฌ๊ธฐ ์กฐ์ •: batch.size ๊ฐ’ ์กฐ์ ˆ๋กœ ํ”„๋กœ๋“€์„œ์˜ ์„ฑ๋Šฅ ํ–ฅ์ƒ.
  • ์••์ถ• ์‚ฌ์šฉ: compression.type=lz4 ์„ค์ •์œผ๋กœ ๋ฐ์ดํ„ฐ ์ „์†ก๋Ÿ‰ ์ ˆ๊ฐ.
  • ๋ฉ”์‹œ์ง€ ํฌ๊ธฐ ์ œํ•œ: message.max.bytes ์„ค์ •์œผ๋กœ ๊ฐœ๋ณ„ ๋ฉ”์‹œ์ง€ ํฌ๊ธฐ ์กฐ์ •.

 

๐Ÿ“Œ 4. Kafka Connect & Schema Registry (์™ธ๋ถ€ ์‹œ์Šคํ…œ ์—ฐ๋™)

โœ… Kafka Connect๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋‹ค๋ฅธ ์‹œ์Šคํ…œ๊ณผ ์†์‰ฝ๊ฒŒ ๋ฐ์ดํ„ฐ๋ฅผ ์—ฐ๋™ํ•  ์ˆ˜ ์žˆ๋‹ค.

 โžก๏ธ ์šด์˜ ํ™˜๊ฒฝ์—์„œ๋Š” Kafka Connect๋ฅผ ํ™œ์šฉํ•˜์—ฌ ๋‹ค์–‘ํ•œ ์‹œ์Šคํ…œ๊ณผ ์—ฐ๋™ํ•˜๊ณ , Schema Registry๋กœ ๋ฐ์ดํ„ฐ ์ผ๊ด€์„ฑ์„ ์œ ์ง€ํ•ด์•ผ ํ•œ๋‹ค.

 

โœ… Kafka Connect์˜ ์—ญํ• 

  • DB, ํด๋ผ์šฐ๋“œ, NoSQL, ํŒŒ์ผ ์‹œ์Šคํ…œ ๋“ฑ ๋‹ค์–‘ํ•œ ์†Œ์Šค์—์„œ Kafka๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ๊ฐ€์ ธ์˜ค๊ฑฐ๋‚˜ ์ „์†ก ๊ฐ€๋Šฅ.
  • Sink Connector: Kafka → ์™ธ๋ถ€ ์‹œ์Šคํ…œ (์˜ˆ: Elasticsearch, HDFS, RDBMS)
  • Source Connector: ์™ธ๋ถ€ ์‹œ์Šคํ…œ → Kafka (์˜ˆ: MySQL, PostgreSQL, MongoDB)

 

โœ… Schema Registry ํ™œ์šฉ

  • Kafka ๋ฉ”์‹œ์ง€์˜ JSON, Avro, Protobuf ๊ฐ™์€ ๋ฐ์ดํ„ฐ ํ˜•์‹์„ ๊ด€๋ฆฌ.
  • ์Šคํ‚ค๋งˆ ๋ณ€๊ฒฝ ์‹œ ๋ฐ์ดํ„ฐ ๋ฌด๊ฒฐ์„ฑ์„ ์œ ์ง€ํ•˜๊ณ , ๋ฐ์ดํ„ฐ ์†์‹ค์„ ๋ฐฉ์ง€.
  • REST API๋ฅผ ํ†ตํ•ด ์Šคํ‚ค๋งˆ๋ฅผ ์กฐํšŒํ•˜๊ณ  ๊ด€๋ฆฌ ๊ฐ€๋Šฅ.

 

๐Ÿ“Œ 5. Kafka Streams & KSQL (์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ)

โœ… Kafka Streams: Kafka ๋ฐ์ดํ„ฐ๋ฅผ ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๋Š” ๊ฐ•๋ ฅํ•œ ์ŠคํŠธ๋ฆฌ๋ฐ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ.
โœ… KSQL: SQL ๋ฌธ๋ฒ•์„ ์ด์šฉํ•ด ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋ฅผ ์‰ฝ๊ฒŒ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ๋‹ค.

 โžก๏ธ ์šด์˜ ํ™˜๊ฒฝ์—์„œ๋Š” Kafka Streams ๋˜๋Š” KSQL์„ ํ™œ์šฉํ•˜์—ฌ ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ๋ฅผ ์ตœ์ ํ™”ํ•  ์ˆ˜ ์žˆ๋‹ค.

 

โœ… Kafka Streams ์ฃผ์š” ๊ฐœ๋…

  • Stateful & Stateless ์—ฐ์‚ฐ ์ง€์›
  • Windowing ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ (์‹œ๊ฐ„ ๊ธฐ๋ฐ˜ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ)
  • Exactly-once ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ (์ค‘๋ณต ๋ฉ”์‹œ์ง€ ๋ฐฉ์ง€)

 

โœ… Kafka Streams ์˜ˆ์ œ

KStream<String, String> stream = builder.stream("input-topic");

stream.filter((key, value) -> value.contains("important"))

      .to("output-topic");

 

โœ… KSQL ์ฃผ์š” ๊ฐœ๋…

  • Kafka ํ† ํ”ฝ์„ SQL์ฒ˜๋Ÿผ ์กฐํšŒํ•˜๊ณ  ํ•„ํ„ฐ๋ง ๊ฐ€๋Šฅ.
  • ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ์กฐ์ธ, ์ง‘๊ณ„, ๋ณ€ํ™˜ ๊ฐ€๋Šฅ.

 

โœ… KSQL ์˜ˆ์ œ

SELECT * FROM transactions WHERE amount > 1000;

 

๐Ÿ“Œ 6. Kafka ์šด์˜ ํ™˜๊ฒฝ์—์„œ์˜ ์˜คํ”„์…‹ ์ปค๋ฐ‹ ์ „๋žต

โœ…  Kafka์—์„œ ์˜คํ”„์…‹(Offset) ์€ ์ปจ์Šˆ๋จธ๊ฐ€ ํŠน์ • ํ† ํ”ฝ์˜ ํŒŒํ‹ฐ์…˜์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์–ด๋””๊นŒ์ง€ ์ฝ์—ˆ๋Š”์ง€๋ฅผ ๋‚˜ํƒ€๋‚ด๋Š” ๋ฒˆํ˜ธ๋‹ค.
 โžก๏ธ ์šด์˜ ํ™˜๊ฒฝ์—์„œ๋Š” ๋ฐ์ดํ„ฐ ์œ ์‹ค ๋ฐฉ์ง€, ์ค‘๋ณต ๋ฐฉ์ง€, ์ •ํ™•ํ•œ ํ•œ ๋ฒˆ๋งŒ ์ฒ˜๋ฆฌ (Exactly-Once) ๋ณด์žฅ์„ ์œ„ํ•ด ์˜คํ”„์…‹ ์ปค๋ฐ‹ ์ „๋žต์ด ์ค‘์š”ํ•จ!

 

โœ… 1. ์˜คํ”„์…‹ ์ปค๋ฐ‹ ๋ฐฉ์‹: ์ž๋™ vs ์ˆ˜๋™

์˜คํ”„์…‹์„ ์–ธ์ œ, ์–ด๋–ป๊ฒŒ ์ €์žฅํ• ์ง€ ๊ฒฐ์ •ํ•˜๋Š” ์„ค์ •.

๋ฐฉ์‹ ์„ค๋ช… ์žฅ์  ๋‹จ์ 
์ž๋™ ์ปค๋ฐ‹ (Auto Commit) Kafka๊ฐ€ auto.commit.interval.ms(๊ธฐ๋ณธ 5์ดˆ)๋งˆ๋‹ค ์ž๋™์œผ๋กœ ์˜คํ”„์…‹ ์ €์žฅ ๊ตฌํ˜„์ด ๊ฐ„๋‹จ ๋ฉ”์‹œ์ง€๊ฐ€ ์ฒ˜๋ฆฌ๋˜๊ธฐ ์ „์— ์ปค๋ฐ‹๋  ์ˆ˜๋„ ์žˆ์Œ (๋ฐ์ดํ„ฐ ์œ ์‹ค ๊ฐ€๋Šฅ)
์ˆ˜๋™ ์ปค๋ฐ‹ (Manual Commit) ์ปจ์Šˆ๋จธ๊ฐ€ ์ง์ ‘ ์˜คํ”„์…‹์„ ์ปค๋ฐ‹ (commitSync(), commitAsync()) ๋ฐ์ดํ„ฐ ์œ ์‹ค & ์ค‘๋ณต ์ตœ์†Œํ™” ๊ฐ€๋Šฅ ๊ฐœ๋ฐœ์ž๊ฐ€ ์ง์ ‘ ๊ด€๋ฆฌํ•ด์•ผ ํ•จ

โœ” ์šด์˜ ํ™˜๊ฒฝ์—์„œ๋Š” ์ž๋™ ์ปค๋ฐ‹๋ณด๋‹ค ์ˆ˜๋™ ์ปค๋ฐ‹์„ ์“ฐ๋Š” ๊ฒŒ ์•ˆ์ „!
โœ” commitSync()๋Š” ์ •ํ™•ํ•˜์ง€๋งŒ ๋Š๋ฆฌ๊ณ , commitAsync()๋Š” ๋น ๋ฅด์ง€๋งŒ ์ผ๋ถ€ ์‹คํŒจ ๊ฐ€๋Šฅ → ๋‘˜์„ ์กฐํ•ฉํ•ด์„œ ์‚ฌ์šฉ ๊ฐ€๋Šฅ!

 

๐Ÿ“– ์ž๋™ ์ปค๋ฐ‹ ๋น„ํ™œ์„ฑํ™”

enable.auto.commit=false

 

๐Ÿ“– ์ผ๋ฐ˜ Kafka ์ปจ์Šˆ๋จธ ์˜ˆ์ œ (commitSync() ์‚ฌ์šฉ)

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));

    for (ConsumerRecord<String, String> record : records) {
        processRecord(record); // ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ
    }

    consumer.commitSync(); // ์ฒ˜๋ฆฌ ์™„๋ฃŒ ํ›„ ์˜คํ”„์…‹ ์ปค๋ฐ‹
}



โญ @KafkaListener ์‚ฌ์šฉ ์‹œ ์ˆ˜๋™ ์ปค๋ฐ‹

๐Ÿ“– Spring Kafka ์˜ˆ์ œ (ack.acknowledge() ์‚ฌ์šฉ)

@KafkaListener(topics = "my-topic", containerFactory = "myFactory")
public void listen(String message, Acknowledgment ack) {
    System.out.println("๋ฉ”์‹œ์ง€ ์ˆ˜์‹ : " + message);
    ack.acknowledge();  // ๋ฉ”์‹œ์ง€ ์ •์ƒ ์ฒ˜๋ฆฌ ํ›„ ์˜คํ”„์…‹ ์ปค๋ฐ‹
}

 

๐Ÿ“– KafkaListenerContainerFactory์—์„œ ackMode ๋ณ€๊ฒฝ

์œ„ ์ฝ”๋“œ๊ฐ€ ์ œ๋Œ€๋กœ ๋™์ž‘ํ•˜๋ ค๋ฉด, ๋ฆฌ์Šค๋„ˆ ์ปจํ…Œ์ด๋„ˆ์˜ ackMode๋ฅผ MANUAL๋กœ ์„ค์ •ํ•ด์•ผ ํ•œ๋‹ค.

@Configuration
public class KafkaConfig {

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, DeliveryStartedEvent> kafkaListenerContainerFactory(
            ConsumerFactory<String, DeliveryStartedEvent> consumerFactory) {
    
        ConcurrentKafkaListenerContainerFactory<String, DeliveryStartedEvent> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // ์ˆ˜๋™ ์ปค๋ฐ‹ ์„ค์ •

        return factory;
    }
}

 

 

โœ… 2. ์˜คํ”„์…‹ ์ดˆ๊ธฐํ™” (Reset) - ๋ฉ”์‹œ์ง€ ์œ ์‹ค ๋ฐฉ์ง€

์ปจ์Šˆ๋จธ๊ฐ€ ์ฒ˜์Œ ์‹œ์ž‘ํ•˜๊ฑฐ๋‚˜, ์˜คํ”„์…‹ ์ •๋ณด๊ฐ€ ์‚ฌ๋ผ์กŒ์„ ๋•Œ Kafka๊ฐ€ ์–ด๋””์„œ๋ถ€ํ„ฐ ์ฝ์„์ง€ ๊ฒฐ์ •ํ•˜๋Š” ์„ค์ •.

์šด์˜ ํ™˜๊ฒฝ์—์„œ๋Š” earliest๋ฅผ ์„ค์ •ํ•ด์„œ ๋ฐ์ดํ„ฐ ์œ ์‹ค์„ ๋ฐฉ์ง€ํ•˜๋Š” ๊ฒŒ ์ข‹์Œ!

์„ค์ • ๊ฐ’ ๋™์ž‘ ๋ฐฉ์‹
earliest ๊ฐ€์žฅ ์˜ค๋ž˜๋œ ๋ฉ”์‹œ์ง€๋ถ€ํ„ฐ ์ฝ์Œ (๋ฐ์ดํ„ฐ ์œ ์‹ค ๋ฐฉ์ง€)
latest (๊ธฐ๋ณธ๊ฐ’) ๊ฐ€์žฅ ์ตœ๊ทผ ๋ฉ”์‹œ์ง€๋ถ€ํ„ฐ ์ฝ์Œ (์ด์ „ ๋ฉ”์‹œ์ง€๋Š” ๋ฌด์‹œ๋จ)

๐Ÿ“– ์˜คํ”„์…‹ ์ดˆ๊ธฐํ™” ์„ค์ •

auto.offset.reset=earliest

 

 

โœ… 3. Exactly-Once ๋ณด์žฅ (์ค‘๋ณต ๋ฐฉ์ง€ & ํŠธ๋žœ์žญ์…˜ ์ฒ˜๋ฆฌ)

Kafka ๊ธฐ๋ณธ ์„ค์ •์—์„œ๋Š” at-least-once ๋ฐฉ์‹์ด๋ผ ์ค‘๋ณต ๋ฉ”์‹œ์ง€ ์†Œ๋น„๊ฐ€ ๋ฐœ์ƒํ•  ์ˆ˜ ์žˆ๋‹ค.
์ด๋ฅผ ๋ฐฉ์ง€ํ•˜๋ ค๋ฉด Exactly-Once ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ์„ค์ •์„ ์ถ”๊ฐ€ํ•ด์•ผ ํ•จ!

  • idempotent producer (์ค‘๋ณต ๋ฐฉ์ง€ ํ”„๋กœ๋“€์„œ) + transactional consumer (ํŠธ๋žœ์žญ์…”๋„ ์ปจ์Šˆ๋จธ) ๋ฅผ ์กฐํ•ฉ.
  • read-process-write ํŒจํ„ด์—์„œ ์†Œ๋น„ํ•œ ๋ฉ”์‹œ์ง€์™€ ์ƒˆ๋กœ์šด ๋ฉ”์‹œ์ง€๋ฅผ ํ•˜๋‚˜์˜ ํŠธ๋žœ์žญ์…˜์œผ๋กœ ๋ฌถ์–ด ์ปค๋ฐ‹ํ•จ.

๐Ÿ“– ํ”„๋กœ๋“€์„œ ์ธก ์„ค์ • (์ค‘๋ณต ๋ฐฉ์ง€)

enable.idempotence=true
acks=all

 

๐Ÿ“– ์ปจ์Šˆ๋จธ ์ธก ์„ค์ • (ํŠธ๋žœ์žญ์…˜ ํ™œ์„ฑํ™”)

isolation.level=read_committed

 

๐Ÿ“– ํŠธ๋žœ์žญ์…˜al ์†Œ๋น„์ž + ํ”„๋กœ๋“€์„œ ์‚ฌ์šฉ

Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transaction-id");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    
    producer.beginTransaction();
    for (ConsumerRecord<String, String> record : records) {
        processRecord(record);
        producer.send(new ProducerRecord<>("new-topic", record.value()));
    }
   
    producer.commitTransaction(); // ํ”„๋กœ๋“€์„œ ํŠธ๋žœ์žญ์…˜ ์ปค๋ฐ‹
    consumer.commitSync(); // ์ปจ์Šˆ๋จธ ์˜คํ”„์…‹ ์ปค๋ฐ‹
}



'Backend > spring cloud (MSA)' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ „์†ก  (0) 2025.02.23
Kafka๋ž€?  (0) 2025.02.23
Eureka๋ž€?  (0) 2025.02.16
GateWay๋ž€?  (0) 2025.02.16
Spring Cloud๋ž€?  (0) 2025.02.16