Backend/spring cloud (MSA)

Kafka๋ž€?

dddzr 2025. 2. 23. 18:54

๐Ÿ”1. ์ด๋ฒคํŠธ ๊ธฐ๋ฐ˜ ์•„ํ‚คํ…์ฒ˜(Event-Driven Architecture)๋ž€?

๋ถ„์‚ฐ๋œ ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜ ์„œ๋น„์Šค๋“ค์ด ์ด๋ฒคํŠธ๋ฅผ ๊ธฐ๋ฐ˜์œผ๋กœ ํ†ต์‹ ํ•˜๊ณ  ์„œ๋กœ์˜ ๋™์ž‘์„ ์•ผ๊ธฐํ•˜๋Š” ํŒจํ„ด์„ ์ด๋ฒคํŠธ ๊ธฐ๋ฐ˜ ์•„ํ‚คํ…์ฒ˜๋ผ๊ณ  ํ•œ๋‹ค.

  • ์„œ๋น„์Šค๋“ค์ด ์ด๋ฒคํŠธ๋ฅผ ๋ฐœํ–‰(publish)ํ•˜๊ณ , ๋‹ค๋ฅธ ์„œ๋น„์Šค๊ฐ€ ์ด๋ฅผ ๊ตฌ๋…(subscribe)ํ•˜์—ฌ ์ฒ˜๋ฆฌ.
  • Kafka, RabbitMQ, SQS, EventBridge ๋“ฑ๊ณผ ๊ฐ™์€ ๋ฉ”์‹œ์ง• ์‹œ์Šคํ…œ์„ ํ†ตํ•ด ๊ตฌํ˜„.

โญ ์ด๋ฒคํŠธ ๊ธฐ๋ฐ˜ ์•„ํ‚คํ…์ฒ˜๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ์ด์œ 

1๏ธโƒฃ ์„œ๋น„์Šค ๊ฐ„ ๊ฒฐํ•ฉ๋„↓ → ๊ฐ ์„œ๋น„์Šค๊ฐ€ ๋…๋ฆฝ์ ์œผ๋กœ ๋™์ž‘ ๊ฐ€๋Šฅ (ํ™•์žฅ์„ฑ↑)
2๏ธโƒฃ ๋น„๋™๊ธฐ ์ฒ˜๋ฆฌ → ๋น ๋ฅธ ์‘๋‹ต ๊ฐ€๋Šฅ, ๋Œ€๋Ÿ‰ ํŠธ๋ž˜ํ”ฝ ์ฒ˜๋ฆฌ์— ์œ ๋ฆฌ
3๏ธโƒฃ ํ™•์žฅ์„ฑ & ์œ ์—ฐ์„ฑ → ์ƒˆ๋กœ์šด ๊ธฐ๋Šฅ ์ถ”๊ฐ€ ์‹œ ๊ธฐ์กด ์‹œ์Šคํ…œ์— ์˜ํ–ฅ ์ตœ์†Œํ™” 

 

๐Ÿ“Œ 2. Kafka๋ž€?

Kafka๋Š” ๋Œ€๊ทœ๋ชจ ๋ฐ์ดํ„ฐ๋ฅผ ์‹ค์‹œ๊ฐ„์œผ๋กœ ์ฒ˜๋ฆฌํ•˜๊ณ  ์ €์žฅํ•˜๋Š” ๋ถ„์‚ฐํ˜• ๋ฉ”์‹œ์ง• ์‹œ์Šคํ…œ์ด์ž ์ŠคํŠธ๋ฆฌ๋ฐ ํ”Œ๋žซํผ์ด๋‹ค. ์ฃผ๋กœ ์ด๋ฒคํŠธ ๊ธฐ๋ฐ˜ ์•„ํ‚คํ…์ฒ˜, ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ, ๋กœ๊ทธ ์ฒ˜๋ฆฌ ๋“ฑ์— ํ™œ์šฉ๋˜๋ฉฐ, MSA ํ™˜๊ฒฝ์—์„œ ๋น„๋™๊ธฐ ๋ฐ์ดํ„ฐ ์ „๋‹ฌ ๋ฐ ์ฒ˜๋ฆฌ๋ฅผ ์œ„ํ•œ ๊ฐ•๋ ฅํ•œ ๋„๊ตฌ๋กœ ์‚ฌ์šฉ๋œ๋‹ค.

โœ… 2-1. ์•„ํ‚คํ…์ฒ˜

Kafka๋Š” ๋ถ„์‚ฐํ˜• ๋ฉ”์‹œ์ง€ ํ ๋˜๋Š” ์‹ค์‹œ๊ฐ„ ์ŠคํŠธ๋ฆฌ๋ฐ ํ”Œ๋žซํผ์œผ๋กœ ๋™์ž‘ํ•˜๋ฉฐ, ๋ฐ์ดํ„ฐ๊ฐ€ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๊ฒฝ๋กœ๋กœ ์ „์†ก๋œ๋‹ค.

Producer → Kafka ๋ธŒ๋กœ์ปค → ํŒŒํ‹ฐ์…˜ → Consumer

 

โœ… 2-2. Kafka์˜ ์ฃผ์š” ํŠน์ง•

  • ๋ถ„์‚ฐ ์‹œ์Šคํ…œ: ์—ฌ๋Ÿฌ ๊ฐœ์˜ ๋ธŒ๋กœ์ปค๋กœ ๊ตฌ์„ฑ๋˜์–ด ํ™•์žฅ์„ฑ, ๋‚ด๊ฒฐํ•จ์„ฑ, ์•ˆ์ •์„ฑ ์ œ๊ณต
  • ์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ: ์ด๋ฒคํŠธ ๊ธฐ๋ฐ˜ ์•„ํ‚คํ…์ฒ˜ ๋ฐ ์‹ค์‹œ๊ฐ„ ๋ถ„์„ ์‹œ์Šคํ…œ์— ์ตœ์ ํ™”
  • ๋‚ด๊ตฌ์„ฑ: ๋ฐ์ดํ„ฐ๋ฅผ ๋””์Šคํฌ์— ์ €์žฅํ•˜๊ณ  ๋ณต์ œ(Replication)ํ•˜์—ฌ ์žฅ์•  ๋ฐœ์ƒ ์‹œ์—๋„ ๋ฐ์ดํ„ฐ ๋ณดํ˜ธ
  • ๋ฉ”์‹œ์ง€ ํ ์—ญํ• : Producer → Topic → Consumer ๊ตฌ์กฐ๋กœ ๋™์ž‘ํ•˜๋ฉฐ, ๋น„๋™๊ธฐ ๋ฐฉ์‹ ์ง€์›
  • ํ™•์žฅ์„ฑ: ๋ฐ์ดํ„ฐ๋ฅผ *ํŒŒํ‹ฐ์…”๋‹(Partitioning) ํ•˜์—ฌ ์ˆ˜ํ‰ ํ™•์žฅ ๊ฐ€๋Šฅ

 

โญ์ƒค๋”ฉ vs ํŒŒํ‹ฐ์…”๋‹

ํŒŒํ‹ฐ์…”๋‹์€ ํ•˜๋‚˜์˜ ์„œ๋ฒ„, ์ƒค๋”ฉ์€ ์—ฌ๋Ÿฌ ์„œ๋ฒ„์— ๋ถ„์‚ฐํ•œ๋‹ค๋Š” ์ฐจ์ด

๊ฐœ๋… ์„ค๋ช… Kafka์—์„œ์˜ ์—ญํ• 
ํŒŒํ‹ฐ์…”๋‹ ํ•˜๋‚˜์˜ ํ† ํ”ฝ์„ ์—ฌ๋Ÿฌ ๊ฐœ์˜ ๋…ผ๋ฆฌ์  ๋‹จ์œ„(ํŒŒํ‹ฐ์…˜)๋กœ ๋‚˜๋ˆ” ํ† ํ”ฝ ๋ฐ์ดํ„ฐ๋ฅผ ๋ถ„์‚ฐ ์ฒ˜๋ฆฌ, Consumer ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ๊ฐ€๋Šฅ
์ƒค๋”ฉ ๋ฐ์ดํ„ฐ๋ฅผ ์—ฌ๋Ÿฌ ์„œ๋ฒ„(๋…ธ๋“œ)๋กœ ๋ถ„์‚ฐ ์ €์žฅ Kafka์—์„œ๋Š” ๋ธŒ๋กœ์ปค๊ฐ€ ์ƒค๋”ฉ ๊ฐœ๋…๊ณผ ์œ ์‚ฌ, ์—ฌ๋Ÿฌ ๋ธŒ๋กœ์ปค๊ฐ€ ๋ฐ์ดํ„ฐ ๋ถ„์‚ฐ ์ฒ˜๋ฆฌ

 

๐Ÿ“Œ 3. Kafka์˜ ๊ธฐ๋ณธ ๊ฐœ๋… & ๋ช…๋ น์–ด

โœ… 3-1. Kafka ๋ธŒ๋กœ์ปค

  • Kafka ํด๋Ÿฌ์Šคํ„ฐ ๋‚ด์˜ ์„œ๋ฒ„๋ฅผ ์ง€์นญ, ๋ฐ์ดํ„ฐ ์ €์žฅ ๋ฐ ๊ด€๋ฆฌ, ๋ฉ”์‹œ์ง€ ์ „๋‹ฌ ์—ญํ• ์„ ํ•จ.
  • ์—ฌ๋Ÿฌ ๋ธŒ๋กœ์ปค๊ฐ€ ํ•จ๊ป˜ ์ž‘๋™ํ•˜์—ฌ ๊ณ ๊ฐ€์šฉ์„ฑ๊ณผ ํ™•์žฅ์„ฑ์„ ์ œ๊ณต.

 

โœ… 3-2. ํ† ํ”ฝ (Topic) - ‘๋ฉ”์‹œ์ง€’๋ผ๊ณ  ์ž์ฃผ ํ‘œํ˜„

  • ๋ฐ์ดํ„ฐ์˜ ๋ถ„๋ฅ˜ ๋‹จ์œ„๋กœ, Producer๋Š” ํŠน์ • ํ† ํ”ฝ์— ๋ฐ์ดํ„ฐ๋ฅผ ์ „์†กํ•˜๊ณ , Consumer๋Š” ํ•ด๋‹น ํ† ํ”ฝ์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์ˆ˜์‹ .
  • ํŒŒํ‹ฐ์…˜(Partition) ๋‹จ์œ„๋กœ ๋‚˜๋‰˜๋ฉฐ, ๊ฐ ํŒŒํ‹ฐ์…˜์€ ๋ฉ”์‹œ์ง€ ์ˆœ์„œ ๋ณด์žฅ

 

โœ”๏ธ ํ† ํ”ฝ ๊ด€๋ฆฌ ๋ช…๋ น์–ด

# ํ† ํ”ฝ ์ƒ์„ฑ
bin/kafka-topics.sh --create --topic <ํ† ํ”ฝ๋ช…> --bootstrap-server <๋ธŒ๋กœ์ปค์ฃผ์†Œ> --partitions 3 --replication-factor 1

# ํ† ํ”ฝ ๋ฆฌ์ŠคํŠธ ์กฐํšŒ
bin/kafka-topics.sh --list --bootstrap-server <๋ธŒ๋กœ์ปค์ฃผ์†Œ>

# ํ† ํ”ฝ ์ƒ์„ธ ์ •๋ณด ์กฐํšŒ
bin/kafka-topics.sh --describe --topic <ํ† ํ”ฝ๋ช…> --bootstrap-server <๋ธŒ๋กœ์ปค์ฃผ์†Œ>

# ํ† ํ”ฝ ์‚ญ์ œ
bin/kafka-topics.sh --delete --topic <ํ† ํ”ฝ๋ช…> --bootstrap-server <๋ธŒ๋กœ์ปค์ฃผ์†Œ>

 

โš ๏ธ ํ† ํ”ฝ ์‚ญ์ œ ํ™œ์„ฑํ™” ํ•„์š”
Kafka ์„ค์ • ํŒŒ์ผ (server.properties)์—์„œ ์•„๋ž˜ ์˜ต์…˜ ์„ค์ • ํ•„์š”

delete.topic.enable=true //์ด๊ฑฐ ์„ค์ •ํ•ด์•ผ ์‚ญ์ œ ๊ฐ€๋Šฅ
auto.create.topics.enable=true //Provider์—์„œ ํ† ํ”ฝ ์ž๋™ ์ƒ์„ฑ

 

โœ… 3-3. ์ด๋ฒคํŠธ (Event)

  • Kafka์—์„œ ์ด๋ฒคํŠธ๋Š” ๋ฐ์ดํ„ฐ์˜ ์ƒํƒœ ๋ณ€ํ™”๋‚˜ ๋™์ž‘์„ ๋‚˜ํƒ€๋‚ด๋Š” ๋ฉ”์‹œ์ง€
  • ๊ธฐ๋ณธ์ ์œผ๋กœ ๊ฐœ๋ณ„ ์ด๋ฒคํŠธ ์‚ญ์ œ ๋ถˆ๊ฐ€ โŒ

 

โœ”๏ธ Retention Policy ์„ค์ • (์ด๋ฒคํŠธ ์ž๋™ ์‚ญ์ œ)

  • ํ† ํ”ฝ ๋‹จ์œ„๋กœ ๋ฉ”์‹œ์ง€ ๋ณด์กด ๊ธฐ๊ฐ„(retention.ms) ๋˜๋Š” ์ตœ๋Œ€ ํฌ๊ธฐ(retention.bytes)๋ฅผ ์„ค์ •ํ•ด ์˜ค๋ž˜๋œ ๋ฉ”์‹œ์ง€๋ฅผ ์ž๋™ ์‚ญ์ œ
log.retention.ms=604800000  # 7์ผ ๋™์•ˆ ๋ณด์กด
log.retention.bytes=1073741824  # 1GB ์ดˆ๊ณผ ์‹œ ์‚ญ์ œ

 

โœ… 3-4. ํŒŒํ‹ฐ์…˜ (Partition)

  • ํ† ํ”ฝ์˜ ์„ธ๋ถ„ํ™” ๋‹จ์œ„. 1๊ฐœ์˜ ํ† ํ”ฝ์€ 1๊ฐœ ์ด์ƒ์˜ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๊ตฌ์„ฑ, ๋ถ„์‚ฐ ์ €์žฅ๋˜์–ด ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ๊ฐ€ ๊ฐ€๋Šฅ.
  • ํŒŒํ‹ฐ์…˜ ์ˆ˜๋Š” ๋ณ‘๋ ฌ ์ฒ˜๋ฆฌ ์„ฑ๋Šฅ์— ์˜ํ–ฅ์„ ๋ฏธ์น˜๋ฏ€๋กœ, ํด๋ผ์ด์–ธํŠธ์™€ ๋ธŒ๋กœ์ปค์˜ ์ฒ˜๋ฆฌ๋Ÿ‰์„ ๊ณ ๋ คํ•ด ๊ฒฐ์ •.

 

โญ ๋ฆฌํ”Œ๋ฆฌ์ผ€์ด์…˜ (Replication)

  • ๋ฐ์ดํ„ฐ ๋ณต์ œ๋ฅผ ํ†ตํ•ด ์žฅ์•  ๋ฐœ์ƒ ์‹œ ๋ฐ์ดํ„ฐ ๋ณดํ˜ธ.
  • ํŒŒํ‹ฐ์…˜์˜ ๋ณต์ œ๋ณธ (ํ•˜๋‚˜์˜ ๋ฆฌ๋” ํŒŒํ‹ฐ์…˜๊ณผ ์—ฌ๋Ÿฌ ํŒ”๋กœ์›Œ ํŒŒํ‹ฐ์…˜์œผ๋กœ ๊ตฌ์„ฑ.), ๋ธŒ๋กœ์ปค ๊ฐœ์ˆ˜๋ณด๋‹ค 1๊ฐœ ์ ๊ฒŒ ์„ค์ •ํ•˜๋Š”๊ฒŒ ์ผ๋ฐ˜์ 

 

โœ” ํ† ํ”ฝ ์ƒ์„ฑ ์‹œ ํŒŒํ‹ฐ์…˜ & ๋ฆฌํ”Œ๋ฆฌ์ผ€์ด์…˜ ์„ค์ •

bin/kafka-topics.sh --create --topic my-topic --partitions <ํŒŒํ‹ฐ์…˜ ์ˆ˜> --replication-factor 2 --bootstrap-server localhost:9092

 

โœ… 3-5. ์˜คํ”„์…‹ (Offset)

  • ๊ฐ ํŒŒํ‹ฐ์…˜์—์„œ ๋ฉ”์‹œ์ง€์˜ ๊ณ ์œ  ID(๋ฒˆํ˜ธ)
  • Consumer๋Š” ์˜คํ”„์…‹์„ ๊ธฐ์ค€์œผ๋กœ ๋ฉ”์‹œ์ง€ ์ˆœ์„œ๋Œ€๋กœ ์ฒ˜๋ฆฌ
  • Consumer Group๋ณ„๋กœ ์˜คํ”„์…‹ ๊ด€๋ฆฌ ๊ฐ€๋Šฅ(๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์ƒํƒœ ์ถ”์  ๋“ฑ)

 

โœ”๏ธ ์˜คํ”„์…‹ ๋ฆฌ์…‹ (์ด์ „ ๋ฉ”์‹œ์ง€ ๋‹ค์‹œ ์ฝ๊ณ  ์‹ถ์„ ๋•Œ)

bin/kafka-consumer-groups.sh --bootstrap-server <๋ธŒ๋กœ์ปค์ฃผ์†Œ> --group <๊ทธ๋ฃน๋ช…> --reset-offsets --topic <ํ† ํ”ฝ๋ช…> --to-earliest --execute

 

โœ”๏ธ ์˜คํ”„์…‹ ๊ฐœ๋ณ„ ์‚ญ์ œ

  • Kafka๋Š” ์˜คํ”„์…‹์„ ๊ฐœ๋ณ„์ ์œผ๋กœ ์‚ญ์ œํ•  ์ˆ˜ ์žˆ๋Š” ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.
  • ํŠน์ • ์˜คํ”„์…‹์„ ์‚ญ์ œํ•˜๋Š” ๋Œ€์‹  ์ „์ฒด ํ† ํ”ฝ์„ ์‚ญ์ œํ•˜๊ฑฐ๋‚˜, Consumer Group ์ž์ฒด๋ฅผ ์žฌ๊ตฌ์„ฑํ•˜์—ฌ ์ƒˆ๋กœ์šด ์˜คํ”„์…‹์„ ์ƒ์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

โœ… 3-6. Producer (๋ฐ์ดํ„ฐ ์ „์†ก)

  • Kafka์— ๋ฐ์ดํ„ฐ๋ฅผ ์ „์†กํ•˜๋Š” ํด๋ผ์ด์–ธํŠธ.
  • ๋ฉ”์‹œ์ง€๋ฅผ ํŠน์ • ํ† ํ”ฝ์˜ ํŠน์ • ํŒŒํ‹ฐ์…˜์— ์ „์†ก.

 

โœ”๏ธ ์ด๋ฒคํŠธ ๋ฐœํ–‰ (Producer ์—ญํ• )

bin/kafka-console-producer.sh --topic <ํ† ํ”ฝ๋ช…> --bootstrap-server <๋ธŒ๋กœ์ปค์ฃผ์†Œ>

 

โœ… 3-7. Consumer (๋ฐ์ดํ„ฐ ์†Œ๋น„)

  • Kafka์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ์†Œ๋น„ํ•˜๋Š” ํด๋ผ์ด์–ธํŠธ.

 

โœ”๏ธ ์ด๋ฒคํŠธ ํ™•์ธ (Consumer ์—ญํ• )

bin/kafka-console-consumer.sh --topic <ํ† ํ”ฝ๋ช…> --bootstrap-server <๋ธŒ๋กœ์ปค์ฃผ์†Œ> --from-beginning

 

โœ… 3-8. Consumer ๊ทธ๋ฃน (Consumer Group)

  • ์—ฌ๋Ÿฌ Consumer๋กœ ๊ตฌ์„ฑ๋œ ๋…ผ๋ฆฌ์  ๋‹จ์œ„
  • Consumer ๊ทธ๋ฃน ๋‹น ๊ฐ ํŒŒํ‹ฐ์…˜์„ ํ•œ ๋ฒˆ๋งŒ ์ฝ์Œ, ๊ฐ™์€ ๊ทธ๋ฃน์˜ Consumer๋“ค์€ ๋ฉ”์‹œ์ง€๋ฅผ ๋ถ„์‚ฐ ์ฒ˜๋ฆฌ
  • ์„œ๋กœ ๋‹ค๋ฅธ Consumer ๊ทธ๋ฃน์€ ๋™์ผ ๋ฐ์ดํ„ฐ๋ฅผ ๋…๋ฆฝ์ ์œผ๋กœ ์†Œ๋น„ ๊ฐ€๋Šฅ

 

โœ”๏ธ Consumer Group ๊ด€๋ฆฌ ๋ช…๋ น์–ด

# Consumer Group ๋ฆฌ์ŠคํŠธ ํ™•์ธ
bin/kafka-consumer-groups.sh --bootstrap-server <๋ธŒ๋กœ์ปค์ฃผ์†Œ> --list

# Consumer Group ์ƒ์„ธ ์ •๋ณด ์กฐํšŒ
bin/kafka-consumer-groups.sh --bootstrap-server <๋ธŒ๋กœ์ปค์ฃผ์†Œ> --describe --group <๊ทธ๋ฃน๋ช…>

# Consumer Group ์‚ญ์ œ (์˜คํ”„์…‹ ํฌํ•จ)
bin/kafka-consumer-groups.sh --bootstrap-server <๋ธŒ๋กœ์ปค์ฃผ์†Œ> --group <๊ธฐ์กด ๊ทธ๋ฃน ์ด๋ฆ„> --delete

 

๐Ÿ“Œ 4. Kafka ์„ค์น˜ ๋ฐ ์‹คํ–‰

Kafka๋Š” ๋ฉ”์‹œ์ง€ ๋ธŒ๋กœ์ปค์ด๋‹ค. ํ”„๋กœ์ ํŠธ์—์„œ Kafka๋ฅผ ์‚ฌ์šฉํ•˜๋ ค๋ฉด

  1. Kafka ๋ธŒ๋กœ์ปค๊ฐ€ ๋ฉ”์‹œ์ง€๋ฅผ ๊ด€๋ฆฌ.
  2. Zookeeper๊ฐ€ Kafka ๋ธŒ๋กœ์ปค๋ฅผ ์กฐ์œจ.
  3. ์• ํ”Œ๋ฆฌ์ผ€์ด์…˜(Spring Kafka)๊ฐ€ Kafka ๋ธŒ๋กœ์ปค์™€ ํ†ต์‹ 

 

โญ ZooKeeper: ๋ธŒ๋กœ์ปค๋ฅผ ๊ด€๋ฆฌํ•˜๊ณ , ๋ฆฌ๋” ์„ ์ถœ๊ณผ ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ๋ฅผ ์ €์žฅ. (Kafka 2.8 ์ดํ›„์—๋Š” Kafka ์ž์ฒด ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ๊ด€๋ฆฌ๋กœ ๋Œ€์ฒด ๊ฐ€๋Šฅ)

 

โœ… 4-1. Kafka ์„œ๋ฒ„ (๋ธŒ๋กœ์ปค)

โœ… 4-1-1. Kafka ์„œ๋ฒ„ ์„ค์น˜

โœ”๏ธ  Kafka ๋‹ค์šด๋กœ๋“œ

Apache Kafka ๋‹ค์šด๋กœ๋“œ ๋งํฌ

 

โœ”๏ธ  ์†Œ์ผ“ ์„œ๋ฒ„ ๋ฆฌ์Šค๋„ˆ ์„ค์ •

server.properties ํŒŒ์ผ์„ ์—ด์–ด์„œ “listeners=PLAINTEXT://:9092” ์•ž์— ๋ถ™์€ ์ฃผ์„(#)์„ ์ œ๊ฑฐํ•ด ์ฃผ๋ฉด ๋ฉ๋‹ˆ๋‹ค.

 

โœ… 4-1-2. Kafka ์‹คํ–‰

Kafka๋Š” Zookeeper์™€ ํ•จ๊ป˜ ์‹คํ–‰ํ•ด์•ผ ํ•œ๋‹ค. ๋‹ค์šด๋กœ๋“œํ•œ Kafka ๋””๋ ‰ํ„ฐ๋ฆฌ์—์„œ ์‹คํ–‰.

*BatํŒŒ์ผ์ด ์—†๋Š”๋ฐ window์—์„œ๋Š” git bash์—์„œ ์‹คํ–‰ํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค!!

 

1๏ธโƒฃ Zookeeper ์‹คํ–‰

Documents/Tools/kafka_2.13-3.9.0์—์„œ

bin/zookeeper-server-start.sh config/zookeeper.properties

 

2๏ธโƒฃ  Kafka ๋ธŒ๋กœ์ปค ์‹คํ–‰

bin/kafka-server-start.sh config/server.properties

 

3๏ธโƒฃ Kafka ๋ธŒ๋กœ์ปค ํ™•์ธ

Kafka ๋ธŒ๋กœ์ปค๊ฐ€ ์‹คํ–‰๋˜๋ฉด ๊ธฐ๋ณธ์ ์œผ๋กœ localhost:9092์—์„œ ์š”์ฒญ์„ ๋Œ€๊ธฐ ์ค‘์ž…๋‹ˆ๋‹ค.

 

โœ… 4-2. Kafka ํด๋ผ์ด์–ธํŠธ

1๏ธโƒฃ ์˜์กด์„ฑ ์ถ”๊ฐ€

 implementation 'org.springframework.kafka:spring-kafka'

 

2๏ธโƒฃ Application.properties

์ฒ˜์Œ์—๋Š” ํ•„์ˆ˜์ธ ๊ฒƒ ๋งŒ ํ•ด๋ณด์ž!!

 

๐Ÿ“–  Kafka Consumer ์„ค์ •

# Kafka ๋ธŒ๋กœ์ปค ๋ฆฌ์ŠคํŠธ (ํ•„์ˆ˜)
# ๋ธŒ๋กœ์ปค ํ˜ธ์ŠคํŠธ ๋ฐ ํฌํŠธ ์ •๋ณด. ์—ฌ๋Ÿฌ ๋ธŒ๋กœ์ปค๋Š” ์‰ผํ‘œ๋กœ ๊ตฌ๋ถ„.
spring.kafka.bootstrap-servers=localhost:9092

# Consumer ๊ทธ๋ฃน ID ์„ค์ • (ํ•„์ˆ˜)
# ๋™์ผํ•œ ๊ทธ๋ฃน ID๋ฅผ ๊ฐ€์ง„ Consumer๋Š” ๋ฉ”์‹œ์ง€๋ฅผ ๊ณต์œ (๋กœ๋“œ ๋ฐธ๋Ÿฐ์‹ฑ)๋ฐ›์Œ.
spring.kafka.consumer.group-id=order-service-group

# Consumer Key ์—ญ์ง๋ ฌํ™” ํด๋ž˜์Šค ์„ค์ • (ํ•„์ˆ˜)
# ๋ฉ”์‹œ์ง€ ํ‚ค๋ฅผ ๋””์ฝ”๋”ฉํ•  ๋•Œ ์‚ฌ์šฉ. ๊ธฐ๋ณธ์ ์œผ๋กœ StringDeserializer ์‚ฌ์šฉ.
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# Consumer Value ์—ญ์ง๋ ฌํ™” ํด๋ž˜์Šค ์„ค์ • (ํ•„์ˆ˜)
# ๋ฉ”์‹œ์ง€ ๊ฐ’์„ ๋””์ฝ”๋”ฉํ•  ๋•Œ ์‚ฌ์šฉ. ๊ธฐ๋ณธ์ ์œผ๋กœ StringDeserializer ์‚ฌ์šฉ.
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

# ๋ฉ”์‹œ์ง€ ์ž๋™ ์ปค๋ฐ‹ ์—ฌ๋ถ€ (Optional)
# true: Consumer๊ฐ€ ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ํ›„ ์ž๋™์œผ๋กœ ์˜คํ”„์…‹์„ ์ปค๋ฐ‹.
spring.kafka.consumer.enable-auto-commit=true

# ์ž๋™ ์ปค๋ฐ‹ ์ฃผ๊ธฐ (Optional)
# ๋ฉ”์‹œ์ง€ ์ฒ˜๋ฆฌ ์™„๋ฃŒ ํ›„ ์˜คํ”„์…‹์„ ์ž๋™์œผ๋กœ ์ปค๋ฐ‹ํ•˜๋Š” ๊ฐ„๊ฒฉ (๋ฐ€๋ฆฌ์ดˆ ๋‹จ์œ„).
spring.kafka.consumer.auto-commit-interval=100

# Consumer๊ฐ€ ์ฒ˜์Œ ์‹œ์ž‘ํ•  ๋•Œ ์ฝ๊ธฐ ์‹œ์ž‘ํ•  ์˜คํ”„์…‹ ์„ค์ • (Optional)
# latest: ๊ฐ€์žฅ ์ตœ๊ทผ์˜ ๋ฉ”์‹œ์ง€๋ถ€ํ„ฐ ์ฝ๊ธฐ ์‹œ์ž‘.
# earliest: ํ† ํ”ฝ์˜ ์ฒ˜์Œ ๋ฉ”์‹œ์ง€๋ถ€ํ„ฐ ์ฝ๊ธฐ ์‹œ์ž‘.
spring.kafka.consumer.auto-offset-reset=latest

# Consumer ์„ธ์…˜ ํƒ€์ž„์•„์›ƒ (Optional)
# Consumer๊ฐ€ ๋ธŒ๋กœ์ปค์™€์˜ ์—ฐ๊ฒฐ์ด ๋Š๊ฒผ๋‹ค๊ณ  ํŒ๋‹จํ•˜๊ธฐ๊นŒ์ง€์˜ ์‹œ๊ฐ„ (๋ฐ€๋ฆฌ์ดˆ ๋‹จ์œ„).
spring.kafka.consumer.properties.session.timeout.ms=15000

# ํ•˜ํŠธ๋น„ํŠธ ๊ฐ„๊ฒฉ (Optional)
# Consumer๊ฐ€ ๋ธŒ๋กœ์ปค์™€ ์—ฐ๊ฒฐ์„ ์œ ์ง€ํ•˜๊ธฐ ์œ„ํ•ด ํ•˜ํŠธ๋น„ํŠธ๋ฅผ ๋ณด๋‚ด๋Š” ๊ฐ„๊ฒฉ (๋ฐ€๋ฆฌ์ดˆ ๋‹จ์œ„).
spring.kafka.consumer.properties.heartbeat.interval.ms=5000

 

๐Ÿ“– Kafka Producer ์„ค์ •

# Producer Key ์ง๋ ฌํ™” ํด๋ž˜์Šค ์„ค์ • (ํ•„์ˆ˜)
# ๋ฉ”์‹œ์ง€ ํ‚ค๋ฅผ ์ธ์ฝ”๋”ฉํ•  ๋•Œ ์‚ฌ์šฉ. ๊ธฐ๋ณธ์ ์œผ๋กœ StringSerializer ์‚ฌ์šฉ.
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer

# Producer Value ์ง๋ ฌํ™” ํด๋ž˜์Šค ์„ค์ • (ํ•„์ˆ˜)
# ๋ฉ”์‹œ์ง€ ๊ฐ’์„ ์ธ์ฝ”๋”ฉํ•  ๋•Œ ์‚ฌ์šฉ. ๊ธฐ๋ณธ์ ์œผ๋กœ StringSerializer ์‚ฌ์šฉ.
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

# ๋ฉ”์‹œ์ง€ ์ „์†ก ํ™•์ธ ๋ชจ๋“œ (Optional)
# 0: ๋ธŒ๋กœ์ปค ํ™•์ธ ์—†์Œ.
# 1: ๋ฆฌ๋” ํŒŒํ‹ฐ์…˜์— ๊ธฐ๋ก ์„ฑ๊ณต ํ™•์ธ.
# all: ๋ชจ๋“  ํŒŒํ‹ฐ์…˜ ๋ณต์ œ๋ณธ์— ๊ธฐ๋ก ์„ฑ๊ณต ํ™•์ธ.
spring.kafka.producer.acks=1

# ์ „์†ก ์‹คํŒจ ์‹œ ์žฌ์‹œ๋„ ํšŸ์ˆ˜ (Optional)
# ๋ฉ”์‹œ์ง€ ์ „์†ก์ด ์‹คํŒจํ–ˆ์„ ๋•Œ ์žฌ์‹œ๋„ ํšŸ์ˆ˜ ์„ค์ •.
spring.kafka.producer.retries=3

# ๋ฉ”์‹œ์ง€ ๋ฐฐ์น˜ ํฌ๊ธฐ (Optional)
# ํ”„๋กœ๋“€์„œ๊ฐ€ ํ•œ ๋ฒˆ์— ์ „์†กํ•  ๋ฉ”์‹œ์ง€ ๋ฐฐ์น˜ ํฌ๊ธฐ (๋ฐ”์ดํŠธ ๋‹จ์œ„).
spring.kafka.producer.batch-size=16384

# ๋ฐฐ์น˜ ์ „์†ก ๋Œ€๊ธฐ ์‹œ๊ฐ„ (Optional)
# ๋ฐฐ์น˜๊ฐ€ ๊ฝ‰ ์ฐจ์ง€ ์•Š์•„๋„ ์ผ์ • ์‹œ๊ฐ„ ๋Œ€๊ธฐ ํ›„ ์ „์†ก (๋ฐ€๋ฆฌ์ดˆ ๋‹จ์œ„).
spring.kafka.producer.linger-ms=1

# Producer ๋ฉ”๋ชจ๋ฆฌ ๋ฒ„ํผ ํฌ๊ธฐ (Optional)
# ์ „์†ก ์ „ ๋ฉ”์‹œ์ง€๋ฅผ ์ €์žฅํ•˜๋Š” ๋ฉ”๋ชจ๋ฆฌ ๋ฒ„ํผ ํฌ๊ธฐ (๋ฐ”์ดํŠธ ๋‹จ์œ„).
spring.kafka.producer.buffer-memory=33554432

# --------------------------------------------
# ์ž์ฃผ ์‚ฌ์šฉํ•˜๋Š” ์ถ”๊ฐ€ ์„ค์ •
# --------------------------------------------

# ๋ฉ”์‹œ์ง€ ์ตœ๋Œ€ ํฌ๊ธฐ (Optional)
# Producer์™€ Consumer์—์„œ ์ „์†ก/์ˆ˜์‹  ๊ฐ€๋Šฅํ•œ ๋ฉ”์‹œ์ง€์˜ ์ตœ๋Œ€ ํฌ๊ธฐ (๋ฐ”์ดํŠธ ๋‹จ์œ„).
spring.kafka.producer.properties.max.request.size=1048576
spring.kafka.consumer.properties.fetch.max.bytes=1048576

# ํ† ํ”ฝ๋ณ„ ์˜คํ”„์…‹ ์ปค๋ฐ‹ ์ฃผ๊ธฐ (Optional)
# ์ž๋™ ์ปค๋ฐ‹์„ ์‚ฌ์šฉํ•  ๊ฒฝ์šฐ ์˜คํ”„์…‹์„ ์ปค๋ฐ‹ํ•˜๋Š” ์ฃผ๊ธฐ (๋ฐ€๋ฆฌ์ดˆ ๋‹จ์œ„).
spring.kafka.consumer.properties.auto.commit.interval.ms=5000

# SSL ์„ค์ • (Optional)
# SSL ์—ฐ๊ฒฐ์ด ํ•„์š”ํ•œ ๊ฒฝ์šฐ ์„ค์ •.
spring.kafka.properties.ssl.truststore.location=/path/to/truststore
spring.kafka.properties.ssl.truststore.password=yourpassword
spring.kafka.properties.ssl.keystore.location=/path/to/keystore
spring.kafka.properties.ssl.keystore.password=yourpassword
spring.kafka.properties.security.protocol=SSL

# ์š”์ฒญ ์‹œ๊ฐ„ ์ดˆ๊ณผ (Optional)
# ๋ธŒ๋กœ์ปค๊ฐ€ ์š”์ฒญ์„ ์ฒ˜๋ฆฌํ•˜์ง€ ๋ชปํ•˜๋ฉด ์ดˆ๊ณผ ์‹œ ์‹คํŒจ ์ฒ˜๋ฆฌ (๋ฐ€๋ฆฌ์ดˆ ๋‹จ์œ„).
spring.kafka.producer.properties.request.timeout.ms=30000
spring.kafka.consumer.properties.request.timeout.ms=30000

# ๋กœ๊ทธ ์ˆ˜์ค€ ์„ค์ • (Optional)
# Kafka ํด๋ผ์ด์–ธํŠธ ๋กœ๊ทธ ์ˆ˜์ค€ ์„ค์ •.
logging.level.org.apache.kafka=INFO

 

๐Ÿ“Œ 5. Kafka ์ด๋ฒคํŠธ ์‚ฌ์šฉ ์˜ˆ์‹œ

โœ… 5-1. ์ด๋ฒคํŠธ ๊ฐ์ฒด ์ƒ์„ฑ

package com.example.order_service.event;
import lombok.Data;

@Data
public class OrderCreatedEvent {
    private int orderId;
    private int productId;
    private int colorId;
    private int sizeId;
    private int quantity;
    private int userId;
}

 

โœ… 5-2. Event ๋ฐœ๊ธ‰

๐Ÿ“–  kafkaTemplate.send("order-created-topic", event);

// OrderService.java

public Orders createOrder(OrderRequest request) {
        // ์ฃผ๋ฌธ ๊ฐ์ฒด ์ƒ์„ฑ
        Orders order = dtoToOrder(request, OrderStatus.CREATED);

        // ์ฃผ๋ฌธ ์ƒํ’ˆ ๋ฆฌ์ŠคํŠธ
        List<OrderItems> orderItems = new ArrayList<>();
        for (OrderItemRequest itemRequest : request.getItems()) {
            OrderItems orderItem = dtoToOrderItems(itemRequest, order);
            orderItems.add(orderItem);
        }
        order.setOrderItems(orderItems);

        // ์ด ๊ธˆ์•ก ๊ณ„์‚ฐ
        BigDecimal totalPrice = orderItems.stream()
                .map(item -> item.getPrice().multiply(new BigDecimal(item.getQuantity())))
                .reduce(BigDecimal.ZERO, BigDecimal::add);
        order.setTotalPrice(totalPrice);

        // ์ฃผ๋ฌธ ์ €์žฅ
        Orders savedOrder = orderRepository.save(order);

        // ์ด๋ฒคํŠธ ๋ฐœํ–‰
        OrderCreatedEvent event = orderToEvent(order);
        kafkaTemplate.send("order-created-topic", event);
        return savedOrder;
}

 

* ์š”์ฒญ ์˜ˆ์‹œ

Post http://localhost:8080/order/api/orders

{
    "userId": 3,
    "shippingAddress": "์„œ์šธ์‹œ",
    "paymentMethod": "card",
    "items": [{
        "productId": 1,
        "sizeId": 1,
        "colorId": 1,
        "quantity": 1,
        "price": 199.99
    }]
}

 

โœ… 5-3. Event ์ˆ˜์‹ 

 

๐Ÿ“–  @KafkaListener(topics = "order-created-topic", groupId = "products-service-group")

//OrderEventListener.java

@KafkaListener(topics = "order-created-topic", groupId = "products-service-group")
public void handleOrderCreatedEvent(String message) {
    try {
        // JSON ๋ฉ”์‹œ์ง€๋ฅผ OrderEvent ๊ฐ์ฒด๋กœ ๋ณ€ํ™˜
        OrderEvent orderEvent = objectMapper.readValue(message, OrderEvent.class);

        // ์ƒ‰์ƒ๊ณผ ์‚ฌ์ด์ฆˆ์— ๋”ฐ๋ฅธ ์žฌ๊ณ  ์กฐํšŒ
        Optional<ProductStocks> optionalStock  = productStocksRepository.findStockQuantity(
            orderEvent.getProductId(),
            orderEvent.getColorId(),
            orderEvent.getSizeId()
        );

        if (optionalStock.isPresent()) {
            ProductStocks stock = optionalStock.get();
            int updatedStock = stock.getStockQuantity() - orderEvent.getQuantity();

            // ์žฌ๊ณ  ๋ถ€์กฑ ์‹œ ์˜ˆ์™ธ ์ฒ˜๋ฆฌ
            if (updatedStock < 0) {
                throw new IllegalStateException("์žฌ๊ณ  ๋ถ€์กฑ: ์š”์ฒญํ•œ ์ˆ˜๋Ÿ‰์„ ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค.");
            }   

            // ์žฌ๊ณ  ์ˆ˜๋Ÿ‰ ์—…๋ฐ์ดํŠธ
            stock.setStockQuantity(updatedStock);
            productStocksRepository.save(stock);   

            System.out.println("์žฌ๊ณ  ์—…๋ฐ์ดํŠธ ์„ฑ๊ณต");                
        } else {
            System.out.println("ํ•ด๋‹น ์ƒ‰์ƒ/์‚ฌ์ด์ฆˆ์— ๋Œ€ํ•œ ์žฌ๊ณ  ์ •๋ณด๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.");
        }
    } catch (Exception e) {
        System.out.println("์ฃผ๋ฌธ ์ฒ˜๋ฆฌ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: " + e.getMessage());
    }
}

 

โญ  6. ์•Œ์•„๋‘๋ฉด ์ข‹์€ ์„ค์ • & ๋ช…๋ น์–ด

1. ์ฃผ์š” ์Šคํฌ๋ฆฝํŠธ

Kafka์˜ ๋ช…๋ น์–ด๋Š” Kafka ์„ค์น˜ ๋””๋ ‰ํ† ๋ฆฌ์˜ bin/ ํด๋”์— ์žˆ๋Š” ์Šคํฌ๋ฆฝํŠธ๋ฅผ ์‚ฌ์šฉ.

  • kafka-topics.sh: ํ† ํ”ฝ ์ƒ์„ฑ, ์กฐํšŒ, ์‚ญ์ œ ๋“ฑ์„ ๊ด€๋ฆฌ.
  • kafka-console-producer.sh: ๋ฉ”์‹œ์ง€ ๋ฐœํ–‰(Producer ์—ญํ• ).
  • kafka-console-consumer.sh: ๋ฉ”์‹œ์ง€ ํ™•์ธ(Consumer ์—ญํ• ).
  • kafka-configs.sh: ๋ธŒ๋กœ์ปค๋‚˜ ํ† ํ”ฝ ์„ค์ • ๋ณ€๊ฒฝ.

 

2. ๋ฉ”์‹œ์ง€ ํ™•์ธ์šฉ ACL ๋น„ํ™œ์„ฑํ™” (๊ฐœ๋ฐœ/ํ…Œ์ŠคํŠธ์šฉ)

Kafka๋Š” ๋ณด์•ˆ ์„ค์ •์ด ๊ธฐ๋ณธ ํ™œ์„ฑํ™”๋œ ๊ฒฝ์šฐ Consumer/Producer๊ฐ€ ์ธ์ฆ๋˜์ง€ ์•Š์œผ๋ฉด ์‹คํŒจํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ๊ฐœ๋ฐœ์šฉ์œผ๋กœ sasl.mechanism์„ ์„ค์ •ํ•˜์ง€ ์•Š์œผ๋ ค๋ฉด server.properties์—์„œ ์•„๋ž˜ ์˜ต์…˜์„ ํ™•์ธํ•˜์„ธ์š”:

authorizer.class.name=

 

3. ๋ธŒ๋กœ์ปค ์ƒํƒœ ํ™•์ธ

bin/kafka-broker-api-versions.sh --bootstrap-server <๋ธŒ๋กœ์ปค์ฃผ์†Œ>

 

4. ๋ฉ”์‹œ์ง€ ํ™•์ธ ์‹œ JSON ํฌ๋งท ํŒŒ์‹ฑ

bin/kafka-console-consumer.sh --topic <ํ† ํ”ฝ๋ช…> --bootstrap-server <๋ธŒ๋กœ์ปค์ฃผ์†Œ> --from-beginning | jq

 

5. ๊ฐœ๋ฐœ ํ™˜๊ฒฝ์—์„œ๋Š” Kafka ๋น„ํ™œ์„ฑํ™”

kafka.enabled=false

spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration

 

@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true", matchIfMissing = true)


'Backend > spring cloud (MSA)' ์นดํ…Œ๊ณ ๋ฆฌ์˜ ๋‹ค๋ฅธ ๊ธ€

์‹ค์‹œ๊ฐ„ ๋ฐ์ดํ„ฐ ์ „์†ก  (0) 2025.02.23
[Kafka] ๊ณ ๊ธ‰ ์„ค์ •  (0) 2025.02.23
Eureka๋ž€?  (0) 2025.02.16
GateWay๋ž€?  (0) 2025.02.16
Spring Cloud๋ž€?  (0) 2025.02.16