๐1. ์ด๋ฒคํธ ๊ธฐ๋ฐ ์ํคํ ์ฒ(Event-Driven Architecture)๋?
๋ถ์ฐ๋ ์ ํ๋ฆฌ์ผ์ด์ ์๋น์ค๋ค์ด ์ด๋ฒคํธ๋ฅผ ๊ธฐ๋ฐ์ผ๋ก ํต์ ํ๊ณ ์๋ก์ ๋์์ ์ผ๊ธฐํ๋ ํจํด์ ์ด๋ฒคํธ ๊ธฐ๋ฐ ์ํคํ ์ฒ๋ผ๊ณ ํ๋ค.
- ์๋น์ค๋ค์ด ์ด๋ฒคํธ๋ฅผ ๋ฐํ(publish)ํ๊ณ , ๋ค๋ฅธ ์๋น์ค๊ฐ ์ด๋ฅผ ๊ตฌ๋ (subscribe)ํ์ฌ ์ฒ๋ฆฌ.
- Kafka, RabbitMQ, SQS, EventBridge ๋ฑ๊ณผ ๊ฐ์ ๋ฉ์์ง ์์คํ ์ ํตํด ๊ตฌํ.
โญ ์ด๋ฒคํธ ๊ธฐ๋ฐ ์ํคํ ์ฒ๋ฅผ ์ฌ์ฉํ๋ ์ด์
1๏ธโฃ ์๋น์ค ๊ฐ ๊ฒฐํฉ๋↓ → ๊ฐ ์๋น์ค๊ฐ ๋
๋ฆฝ์ ์ผ๋ก ๋์ ๊ฐ๋ฅ (ํ์ฅ์ฑ↑)
2๏ธโฃ ๋น๋๊ธฐ ์ฒ๋ฆฌ → ๋น ๋ฅธ ์๋ต ๊ฐ๋ฅ, ๋๋ ํธ๋ํฝ ์ฒ๋ฆฌ์ ์ ๋ฆฌ
3๏ธโฃ ํ์ฅ์ฑ & ์ ์ฐ์ฑ → ์๋ก์ด ๊ธฐ๋ฅ ์ถ๊ฐ ์ ๊ธฐ์กด ์์คํ
์ ์ํฅ ์ต์ํ
๐ 2. Kafka๋?
Kafka๋ ๋๊ท๋ชจ ๋ฐ์ดํฐ๋ฅผ ์ค์๊ฐ์ผ๋ก ์ฒ๋ฆฌํ๊ณ ์ ์ฅํ๋ ๋ถ์ฐํ ๋ฉ์์ง ์์คํ ์ด์ ์คํธ๋ฆฌ๋ฐ ํ๋ซํผ์ด๋ค. ์ฃผ๋ก ์ด๋ฒคํธ ๊ธฐ๋ฐ ์ํคํ ์ฒ, ์ค์๊ฐ ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ, ๋ก๊ทธ ์ฒ๋ฆฌ ๋ฑ์ ํ์ฉ๋๋ฉฐ, MSA ํ๊ฒฝ์์ ๋น๋๊ธฐ ๋ฐ์ดํฐ ์ ๋ฌ ๋ฐ ์ฒ๋ฆฌ๋ฅผ ์ํ ๊ฐ๋ ฅํ ๋๊ตฌ๋ก ์ฌ์ฉ๋๋ค.
โ 2-1. ์ํคํ ์ฒ
Kafka๋ ๋ถ์ฐํ ๋ฉ์์ง ํ ๋๋ ์ค์๊ฐ ์คํธ๋ฆฌ๋ฐ ํ๋ซํผ์ผ๋ก ๋์ํ๋ฉฐ, ๋ฐ์ดํฐ๊ฐ ๋ค์๊ณผ ๊ฐ์ ๊ฒฝ๋ก๋ก ์ ์ก๋๋ค.
Producer → Kafka ๋ธ๋ก์ปค → ํํฐ์
→ Consumer
โ 2-2. Kafka์ ์ฃผ์ ํน์ง
- ๋ถ์ฐ ์์คํ : ์ฌ๋ฌ ๊ฐ์ ๋ธ๋ก์ปค๋ก ๊ตฌ์ฑ๋์ด ํ์ฅ์ฑ, ๋ด๊ฒฐํจ์ฑ, ์์ ์ฑ ์ ๊ณต
- ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ: ์ด๋ฒคํธ ๊ธฐ๋ฐ ์ํคํ ์ฒ ๋ฐ ์ค์๊ฐ ๋ถ์ ์์คํ ์ ์ต์ ํ
- ๋ด๊ตฌ์ฑ: ๋ฐ์ดํฐ๋ฅผ ๋์คํฌ์ ์ ์ฅํ๊ณ ๋ณต์ (Replication)ํ์ฌ ์ฅ์ ๋ฐ์ ์์๋ ๋ฐ์ดํฐ ๋ณดํธ
- ๋ฉ์์ง ํ ์ญํ : Producer → Topic → Consumer ๊ตฌ์กฐ๋ก ๋์ํ๋ฉฐ, ๋น๋๊ธฐ ๋ฐฉ์ ์ง์
- ํ์ฅ์ฑ: ๋ฐ์ดํฐ๋ฅผ *ํํฐ์ ๋(Partitioning) ํ์ฌ ์ํ ํ์ฅ ๊ฐ๋ฅ
โญ์ค๋ฉ vs ํํฐ์ ๋
ํํฐ์ ๋์ ํ๋์ ์๋ฒ, ์ค๋ฉ์ ์ฌ๋ฌ ์๋ฒ์ ๋ถ์ฐํ๋ค๋ ์ฐจ์ด
๊ฐ๋ | ์ค๋ช | Kafka์์์ ์ญํ |
ํํฐ์ ๋ | ํ๋์ ํ ํฝ์ ์ฌ๋ฌ ๊ฐ์ ๋ ผ๋ฆฌ์ ๋จ์(ํํฐ์ )๋ก ๋๋ | ํ ํฝ ๋ฐ์ดํฐ๋ฅผ ๋ถ์ฐ ์ฒ๋ฆฌ, Consumer ๋ณ๋ ฌ ์ฒ๋ฆฌ ๊ฐ๋ฅ |
์ค๋ฉ | ๋ฐ์ดํฐ๋ฅผ ์ฌ๋ฌ ์๋ฒ(๋ ธ๋)๋ก ๋ถ์ฐ ์ ์ฅ | Kafka์์๋ ๋ธ๋ก์ปค๊ฐ ์ค๋ฉ ๊ฐ๋ ๊ณผ ์ ์ฌ, ์ฌ๋ฌ ๋ธ๋ก์ปค๊ฐ ๋ฐ์ดํฐ ๋ถ์ฐ ์ฒ๋ฆฌ |
๐ 3. Kafka์ ๊ธฐ๋ณธ ๊ฐ๋ & ๋ช ๋ น์ด
โ 3-1. Kafka ๋ธ๋ก์ปค
- Kafka ํด๋ฌ์คํฐ ๋ด์ ์๋ฒ๋ฅผ ์ง์นญ, ๋ฐ์ดํฐ ์ ์ฅ ๋ฐ ๊ด๋ฆฌ, ๋ฉ์์ง ์ ๋ฌ ์ญํ ์ ํจ.
- ์ฌ๋ฌ ๋ธ๋ก์ปค๊ฐ ํจ๊ป ์๋ํ์ฌ ๊ณ ๊ฐ์ฉ์ฑ๊ณผ ํ์ฅ์ฑ์ ์ ๊ณต.
โ 3-2. ํ ํฝ (Topic) - ‘๋ฉ์์ง’๋ผ๊ณ ์์ฃผ ํํ
- ๋ฐ์ดํฐ์ ๋ถ๋ฅ ๋จ์๋ก, Producer๋ ํน์ ํ ํฝ์ ๋ฐ์ดํฐ๋ฅผ ์ ์กํ๊ณ , Consumer๋ ํด๋น ํ ํฝ์์ ๋ฐ์ดํฐ๋ฅผ ์์ .
- ํํฐ์ (Partition) ๋จ์๋ก ๋๋๋ฉฐ, ๊ฐ ํํฐ์ ์ ๋ฉ์์ง ์์ ๋ณด์ฅ
โ๏ธ ํ ํฝ ๊ด๋ฆฌ ๋ช ๋ น์ด
# ํ ํฝ ์์ฑ
bin/kafka-topics.sh --create --topic <ํ ํฝ๋ช
> --bootstrap-server <๋ธ๋ก์ปค์ฃผ์> --partitions 3 --replication-factor 1
# ํ ํฝ ๋ฆฌ์คํธ ์กฐํ
bin/kafka-topics.sh --list --bootstrap-server <๋ธ๋ก์ปค์ฃผ์>
# ํ ํฝ ์์ธ ์ ๋ณด ์กฐํ
bin/kafka-topics.sh --describe --topic <ํ ํฝ๋ช
> --bootstrap-server <๋ธ๋ก์ปค์ฃผ์>
# ํ ํฝ ์ญ์
bin/kafka-topics.sh --delete --topic <ํ ํฝ๋ช
> --bootstrap-server <๋ธ๋ก์ปค์ฃผ์>
โ ๏ธ ํ ํฝ ์ญ์ ํ์ฑํ ํ์
Kafka ์ค์ ํ์ผ (server.properties)์์ ์๋ ์ต์
์ค์ ํ์
delete.topic.enable=true //์ด๊ฑฐ ์ค์ ํด์ผ ์ญ์ ๊ฐ๋ฅ
auto.create.topics.enable=true //Provider์์ ํ ํฝ ์๋ ์์ฑ
โ 3-3. ์ด๋ฒคํธ (Event)
- Kafka์์ ์ด๋ฒคํธ๋ ๋ฐ์ดํฐ์ ์ํ ๋ณํ๋ ๋์์ ๋ํ๋ด๋ ๋ฉ์์ง
- ๊ธฐ๋ณธ์ ์ผ๋ก ๊ฐ๋ณ ์ด๋ฒคํธ ์ญ์ ๋ถ๊ฐ โ
โ๏ธ Retention Policy ์ค์ (์ด๋ฒคํธ ์๋ ์ญ์ )
- ํ ํฝ ๋จ์๋ก ๋ฉ์์ง ๋ณด์กด ๊ธฐ๊ฐ(retention.ms) ๋๋ ์ต๋ ํฌ๊ธฐ(retention.bytes)๋ฅผ ์ค์ ํด ์ค๋๋ ๋ฉ์์ง๋ฅผ ์๋ ์ญ์
log.retention.ms=604800000 # 7์ผ ๋์ ๋ณด์กด
log.retention.bytes=1073741824 # 1GB ์ด๊ณผ ์ ์ญ์
โ 3-4. ํํฐ์ (Partition)
- ํ ํฝ์ ์ธ๋ถํ ๋จ์. 1๊ฐ์ ํ ํฝ์ 1๊ฐ ์ด์์ ํํฐ์ ์ผ๋ก ๊ตฌ์ฑ, ๋ถ์ฐ ์ ์ฅ๋์ด ๋ณ๋ ฌ ์ฒ๋ฆฌ๊ฐ ๊ฐ๋ฅ.
- ํํฐ์ ์๋ ๋ณ๋ ฌ ์ฒ๋ฆฌ ์ฑ๋ฅ์ ์ํฅ์ ๋ฏธ์น๋ฏ๋ก, ํด๋ผ์ด์ธํธ์ ๋ธ๋ก์ปค์ ์ฒ๋ฆฌ๋์ ๊ณ ๋ คํด ๊ฒฐ์ .
โญ ๋ฆฌํ๋ฆฌ์ผ์ด์ (Replication)
- ๋ฐ์ดํฐ ๋ณต์ ๋ฅผ ํตํด ์ฅ์ ๋ฐ์ ์ ๋ฐ์ดํฐ ๋ณดํธ.
- ํํฐ์ ์ ๋ณต์ ๋ณธ (ํ๋์ ๋ฆฌ๋ ํํฐ์ ๊ณผ ์ฌ๋ฌ ํ๋ก์ ํํฐ์ ์ผ๋ก ๊ตฌ์ฑ.), ๋ธ๋ก์ปค ๊ฐ์๋ณด๋ค 1๊ฐ ์ ๊ฒ ์ค์ ํ๋๊ฒ ์ผ๋ฐ์
โ ํ ํฝ ์์ฑ ์ ํํฐ์ & ๋ฆฌํ๋ฆฌ์ผ์ด์ ์ค์
bin/kafka-topics.sh --create --topic my-topic --partitions <ํํฐ์
์> --replication-factor 2 --bootstrap-server localhost:9092
โ 3-5. ์คํ์ (Offset)
- ๊ฐ ํํฐ์ ์์ ๋ฉ์์ง์ ๊ณ ์ ID(๋ฒํธ)
- Consumer๋ ์คํ์ ์ ๊ธฐ์ค์ผ๋ก ๋ฉ์์ง ์์๋๋ก ์ฒ๋ฆฌ
- Consumer Group๋ณ๋ก ์คํ์ ๊ด๋ฆฌ ๊ฐ๋ฅ(๋ฉ์์ง ์ฒ๋ฆฌ ์ํ ์ถ์ ๋ฑ)
โ๏ธ ์คํ์ ๋ฆฌ์ (์ด์ ๋ฉ์์ง ๋ค์ ์ฝ๊ณ ์ถ์ ๋)
bin/kafka-consumer-groups.sh --bootstrap-server <๋ธ๋ก์ปค์ฃผ์> --group <๊ทธ๋ฃน๋ช
> --reset-offsets --topic <ํ ํฝ๋ช
> --to-earliest --execute
โ๏ธ ์คํ์ ๊ฐ๋ณ ์ญ์
- Kafka๋ ์คํ์ ์ ๊ฐ๋ณ์ ์ผ๋ก ์ญ์ ํ ์ ์๋ ๊ธฐ๋ฅ์ ์ ๊ณตํ์ง ์์ต๋๋ค.
- ํน์ ์คํ์ ์ ์ญ์ ํ๋ ๋์ ์ ์ฒด ํ ํฝ์ ์ญ์ ํ๊ฑฐ๋, Consumer Group ์์ฒด๋ฅผ ์ฌ๊ตฌ์ฑํ์ฌ ์๋ก์ด ์คํ์ ์ ์์ฑํ ์ ์์ต๋๋ค.
โ 3-6. Producer (๋ฐ์ดํฐ ์ ์ก)
- Kafka์ ๋ฐ์ดํฐ๋ฅผ ์ ์กํ๋ ํด๋ผ์ด์ธํธ.
- ๋ฉ์์ง๋ฅผ ํน์ ํ ํฝ์ ํน์ ํํฐ์ ์ ์ ์ก.
โ๏ธ ์ด๋ฒคํธ ๋ฐํ (Producer ์ญํ )
bin/kafka-console-producer.sh --topic <ํ ํฝ๋ช
> --bootstrap-server <๋ธ๋ก์ปค์ฃผ์>
โ 3-7. Consumer (๋ฐ์ดํฐ ์๋น)
- Kafka์์ ๋ฐ์ดํฐ๋ฅผ ์๋นํ๋ ํด๋ผ์ด์ธํธ.
โ๏ธ ์ด๋ฒคํธ ํ์ธ (Consumer ์ญํ )
bin/kafka-console-consumer.sh --topic <ํ ํฝ๋ช > --bootstrap-server <๋ธ๋ก์ปค์ฃผ์> --from-beginning
โ 3-8. Consumer ๊ทธ๋ฃน (Consumer Group)
- ์ฌ๋ฌ Consumer๋ก ๊ตฌ์ฑ๋ ๋ ผ๋ฆฌ์ ๋จ์
- Consumer ๊ทธ๋ฃน ๋น ๊ฐ ํํฐ์ ์ ํ ๋ฒ๋ง ์ฝ์, ๊ฐ์ ๊ทธ๋ฃน์ Consumer๋ค์ ๋ฉ์์ง๋ฅผ ๋ถ์ฐ ์ฒ๋ฆฌ
- ์๋ก ๋ค๋ฅธ Consumer ๊ทธ๋ฃน์ ๋์ผ ๋ฐ์ดํฐ๋ฅผ ๋ ๋ฆฝ์ ์ผ๋ก ์๋น ๊ฐ๋ฅ
โ๏ธ Consumer Group ๊ด๋ฆฌ ๋ช ๋ น์ด
# Consumer Group ๋ฆฌ์คํธ ํ์ธ
bin/kafka-consumer-groups.sh --bootstrap-server <๋ธ๋ก์ปค์ฃผ์> --list
# Consumer Group ์์ธ ์ ๋ณด ์กฐํ
bin/kafka-consumer-groups.sh --bootstrap-server <๋ธ๋ก์ปค์ฃผ์> --describe --group <๊ทธ๋ฃน๋ช
>
# Consumer Group ์ญ์ (์คํ์
ํฌํจ)
bin/kafka-consumer-groups.sh --bootstrap-server <๋ธ๋ก์ปค์ฃผ์> --group <๊ธฐ์กด ๊ทธ๋ฃน ์ด๋ฆ> --delete
๐ 4. Kafka ์ค์น ๋ฐ ์คํ
Kafka๋ ๋ฉ์์ง ๋ธ๋ก์ปค์ด๋ค. ํ๋ก์ ํธ์์ Kafka๋ฅผ ์ฌ์ฉํ๋ ค๋ฉด
- Kafka ๋ธ๋ก์ปค๊ฐ ๋ฉ์์ง๋ฅผ ๊ด๋ฆฌ.
- Zookeeper๊ฐ Kafka ๋ธ๋ก์ปค๋ฅผ ์กฐ์จ.
- ์ ํ๋ฆฌ์ผ์ด์ (Spring Kafka)๊ฐ Kafka ๋ธ๋ก์ปค์ ํต์
โญ ZooKeeper: ๋ธ๋ก์ปค๋ฅผ ๊ด๋ฆฌํ๊ณ , ๋ฆฌ๋ ์ ์ถ๊ณผ ๋ฉํ๋ฐ์ดํฐ๋ฅผ ์ ์ฅ. (Kafka 2.8 ์ดํ์๋ Kafka ์์ฒด ๋ฉํ๋ฐ์ดํฐ ๊ด๋ฆฌ๋ก ๋์ฒด ๊ฐ๋ฅ)
โ 4-1. Kafka ์๋ฒ (๋ธ๋ก์ปค)
โ 4-1-1. Kafka ์๋ฒ ์ค์น
โ๏ธ Kafka ๋ค์ด๋ก๋
Apache Kafka ๋ค์ด๋ก๋ ๋งํฌ
โ๏ธ ์์ผ ์๋ฒ ๋ฆฌ์ค๋ ์ค์
server.properties ํ์ผ์ ์ด์ด์ “listeners=PLAINTEXT://:9092” ์์ ๋ถ์ ์ฃผ์(#)์ ์ ๊ฑฐํด ์ฃผ๋ฉด ๋ฉ๋๋ค.
โ 4-1-2. Kafka ์คํ
Kafka๋ Zookeeper์ ํจ๊ป ์คํํด์ผ ํ๋ค. ๋ค์ด๋ก๋ํ Kafka ๋๋ ํฐ๋ฆฌ์์ ์คํ.
*Batํ์ผ์ด ์๋๋ฐ window์์๋ git bash์์ ์คํํ๋ฉด ๋ฉ๋๋ค!!
1๏ธโฃ Zookeeper ์คํ
Documents/Tools/kafka_2.13-3.9.0์์
bin/zookeeper-server-start.sh config/zookeeper.properties
2๏ธโฃ Kafka ๋ธ๋ก์ปค ์คํ
bin/kafka-server-start.sh config/server.properties
3๏ธโฃ Kafka ๋ธ๋ก์ปค ํ์ธ
Kafka ๋ธ๋ก์ปค๊ฐ ์คํ๋๋ฉด ๊ธฐ๋ณธ์ ์ผ๋ก localhost:9092์์ ์์ฒญ์ ๋๊ธฐ ์ค์ ๋๋ค.
โ 4-2. Kafka ํด๋ผ์ด์ธํธ
1๏ธโฃ ์์กด์ฑ ์ถ๊ฐ
implementation 'org.springframework.kafka:spring-kafka'
2๏ธโฃ Application.properties
์ฒ์์๋ ํ์์ธ ๊ฒ ๋ง ํด๋ณด์!!
๐ Kafka Consumer ์ค์
# Kafka ๋ธ๋ก์ปค ๋ฆฌ์คํธ (ํ์)
# ๋ธ๋ก์ปค ํธ์คํธ ๋ฐ ํฌํธ ์ ๋ณด. ์ฌ๋ฌ ๋ธ๋ก์ปค๋ ์ผํ๋ก ๊ตฌ๋ถ.
spring.kafka.bootstrap-servers=localhost:9092
# Consumer ๊ทธ๋ฃน ID ์ค์ (ํ์)
# ๋์ผํ ๊ทธ๋ฃน ID๋ฅผ ๊ฐ์ง Consumer๋ ๋ฉ์์ง๋ฅผ ๊ณต์ (๋ก๋ ๋ฐธ๋ฐ์ฑ)๋ฐ์.
spring.kafka.consumer.group-id=order-service-group
# Consumer Key ์ญ์ง๋ ฌํ ํด๋์ค ์ค์ (ํ์)
# ๋ฉ์์ง ํค๋ฅผ ๋์ฝ๋ฉํ ๋ ์ฌ์ฉ. ๊ธฐ๋ณธ์ ์ผ๋ก StringDeserializer ์ฌ์ฉ.
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# Consumer Value ์ญ์ง๋ ฌํ ํด๋์ค ์ค์ (ํ์)
# ๋ฉ์์ง ๊ฐ์ ๋์ฝ๋ฉํ ๋ ์ฌ์ฉ. ๊ธฐ๋ณธ์ ์ผ๋ก StringDeserializer ์ฌ์ฉ.
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# ๋ฉ์์ง ์๋ ์ปค๋ฐ ์ฌ๋ถ (Optional)
# true: Consumer๊ฐ ๋ฉ์์ง ์ฒ๋ฆฌ ํ ์๋์ผ๋ก ์คํ์
์ ์ปค๋ฐ.
spring.kafka.consumer.enable-auto-commit=true
# ์๋ ์ปค๋ฐ ์ฃผ๊ธฐ (Optional)
# ๋ฉ์์ง ์ฒ๋ฆฌ ์๋ฃ ํ ์คํ์
์ ์๋์ผ๋ก ์ปค๋ฐํ๋ ๊ฐ๊ฒฉ (๋ฐ๋ฆฌ์ด ๋จ์).
spring.kafka.consumer.auto-commit-interval=100
# Consumer๊ฐ ์ฒ์ ์์ํ ๋ ์ฝ๊ธฐ ์์ํ ์คํ์
์ค์ (Optional)
# latest: ๊ฐ์ฅ ์ต๊ทผ์ ๋ฉ์์ง๋ถํฐ ์ฝ๊ธฐ ์์.
# earliest: ํ ํฝ์ ์ฒ์ ๋ฉ์์ง๋ถํฐ ์ฝ๊ธฐ ์์.
spring.kafka.consumer.auto-offset-reset=latest
# Consumer ์ธ์
ํ์์์ (Optional)
# Consumer๊ฐ ๋ธ๋ก์ปค์์ ์ฐ๊ฒฐ์ด ๋๊ฒผ๋ค๊ณ ํ๋จํ๊ธฐ๊น์ง์ ์๊ฐ (๋ฐ๋ฆฌ์ด ๋จ์).
spring.kafka.consumer.properties.session.timeout.ms=15000
# ํํธ๋นํธ ๊ฐ๊ฒฉ (Optional)
# Consumer๊ฐ ๋ธ๋ก์ปค์ ์ฐ๊ฒฐ์ ์ ์งํ๊ธฐ ์ํด ํํธ๋นํธ๋ฅผ ๋ณด๋ด๋ ๊ฐ๊ฒฉ (๋ฐ๋ฆฌ์ด ๋จ์).
spring.kafka.consumer.properties.heartbeat.interval.ms=5000
๐ Kafka Producer ์ค์
# Producer Key ์ง๋ ฌํ ํด๋์ค ์ค์ (ํ์)
# ๋ฉ์์ง ํค๋ฅผ ์ธ์ฝ๋ฉํ ๋ ์ฌ์ฉ. ๊ธฐ๋ณธ์ ์ผ๋ก StringSerializer ์ฌ์ฉ.
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
# Producer Value ์ง๋ ฌํ ํด๋์ค ์ค์ (ํ์)
# ๋ฉ์์ง ๊ฐ์ ์ธ์ฝ๋ฉํ ๋ ์ฌ์ฉ. ๊ธฐ๋ณธ์ ์ผ๋ก StringSerializer ์ฌ์ฉ.
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# ๋ฉ์์ง ์ ์ก ํ์ธ ๋ชจ๋ (Optional)
# 0: ๋ธ๋ก์ปค ํ์ธ ์์.
# 1: ๋ฆฌ๋ ํํฐ์
์ ๊ธฐ๋ก ์ฑ๊ณต ํ์ธ.
# all: ๋ชจ๋ ํํฐ์
๋ณต์ ๋ณธ์ ๊ธฐ๋ก ์ฑ๊ณต ํ์ธ.
spring.kafka.producer.acks=1
# ์ ์ก ์คํจ ์ ์ฌ์๋ ํ์ (Optional)
# ๋ฉ์์ง ์ ์ก์ด ์คํจํ์ ๋ ์ฌ์๋ ํ์ ์ค์ .
spring.kafka.producer.retries=3
# ๋ฉ์์ง ๋ฐฐ์น ํฌ๊ธฐ (Optional)
# ํ๋ก๋์๊ฐ ํ ๋ฒ์ ์ ์กํ ๋ฉ์์ง ๋ฐฐ์น ํฌ๊ธฐ (๋ฐ์ดํธ ๋จ์).
spring.kafka.producer.batch-size=16384
# ๋ฐฐ์น ์ ์ก ๋๊ธฐ ์๊ฐ (Optional)
# ๋ฐฐ์น๊ฐ ๊ฝ ์ฐจ์ง ์์๋ ์ผ์ ์๊ฐ ๋๊ธฐ ํ ์ ์ก (๋ฐ๋ฆฌ์ด ๋จ์).
spring.kafka.producer.linger-ms=1
# Producer ๋ฉ๋ชจ๋ฆฌ ๋ฒํผ ํฌ๊ธฐ (Optional)
# ์ ์ก ์ ๋ฉ์์ง๋ฅผ ์ ์ฅํ๋ ๋ฉ๋ชจ๋ฆฌ ๋ฒํผ ํฌ๊ธฐ (๋ฐ์ดํธ ๋จ์).
spring.kafka.producer.buffer-memory=33554432
# --------------------------------------------
# ์์ฃผ ์ฌ์ฉํ๋ ์ถ๊ฐ ์ค์
# --------------------------------------------
# ๋ฉ์์ง ์ต๋ ํฌ๊ธฐ (Optional)
# Producer์ Consumer์์ ์ ์ก/์์ ๊ฐ๋ฅํ ๋ฉ์์ง์ ์ต๋ ํฌ๊ธฐ (๋ฐ์ดํธ ๋จ์).
spring.kafka.producer.properties.max.request.size=1048576
spring.kafka.consumer.properties.fetch.max.bytes=1048576
# ํ ํฝ๋ณ ์คํ์
์ปค๋ฐ ์ฃผ๊ธฐ (Optional)
# ์๋ ์ปค๋ฐ์ ์ฌ์ฉํ ๊ฒฝ์ฐ ์คํ์
์ ์ปค๋ฐํ๋ ์ฃผ๊ธฐ (๋ฐ๋ฆฌ์ด ๋จ์).
spring.kafka.consumer.properties.auto.commit.interval.ms=5000
# SSL ์ค์ (Optional)
# SSL ์ฐ๊ฒฐ์ด ํ์ํ ๊ฒฝ์ฐ ์ค์ .
spring.kafka.properties.ssl.truststore.location=/path/to/truststore
spring.kafka.properties.ssl.truststore.password=yourpassword
spring.kafka.properties.ssl.keystore.location=/path/to/keystore
spring.kafka.properties.ssl.keystore.password=yourpassword
spring.kafka.properties.security.protocol=SSL
# ์์ฒญ ์๊ฐ ์ด๊ณผ (Optional)
# ๋ธ๋ก์ปค๊ฐ ์์ฒญ์ ์ฒ๋ฆฌํ์ง ๋ชปํ๋ฉด ์ด๊ณผ ์ ์คํจ ์ฒ๋ฆฌ (๋ฐ๋ฆฌ์ด ๋จ์).
spring.kafka.producer.properties.request.timeout.ms=30000
spring.kafka.consumer.properties.request.timeout.ms=30000
# ๋ก๊ทธ ์์ค ์ค์ (Optional)
# Kafka ํด๋ผ์ด์ธํธ ๋ก๊ทธ ์์ค ์ค์ .
logging.level.org.apache.kafka=INFO
๐ 5. Kafka ์ด๋ฒคํธ ์ฌ์ฉ ์์
โ 5-1. ์ด๋ฒคํธ ๊ฐ์ฒด ์์ฑ
package com.example.order_service.event;
import lombok.Data;
@Data
public class OrderCreatedEvent {
private int orderId;
private int productId;
private int colorId;
private int sizeId;
private int quantity;
private int userId;
}
โ 5-2. Event ๋ฐ๊ธ
๐ kafkaTemplate.send("order-created-topic", event);
// OrderService.java
public Orders createOrder(OrderRequest request) {
// ์ฃผ๋ฌธ ๊ฐ์ฒด ์์ฑ
Orders order = dtoToOrder(request, OrderStatus.CREATED);
// ์ฃผ๋ฌธ ์ํ ๋ฆฌ์คํธ
List<OrderItems> orderItems = new ArrayList<>();
for (OrderItemRequest itemRequest : request.getItems()) {
OrderItems orderItem = dtoToOrderItems(itemRequest, order);
orderItems.add(orderItem);
}
order.setOrderItems(orderItems);
// ์ด ๊ธ์ก ๊ณ์ฐ
BigDecimal totalPrice = orderItems.stream()
.map(item -> item.getPrice().multiply(new BigDecimal(item.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
order.setTotalPrice(totalPrice);
// ์ฃผ๋ฌธ ์ ์ฅ
Orders savedOrder = orderRepository.save(order);
// ์ด๋ฒคํธ ๋ฐํ
OrderCreatedEvent event = orderToEvent(order);
kafkaTemplate.send("order-created-topic", event);
return savedOrder;
}
* ์์ฒญ ์์
Post http://localhost:8080/order/api/orders
{
"userId": 3,
"shippingAddress": "์์ธ์",
"paymentMethod": "card",
"items": [{
"productId": 1,
"sizeId": 1,
"colorId": 1,
"quantity": 1,
"price": 199.99
}]
}
โ 5-3. Event ์์
๐ @KafkaListener(topics = "order-created-topic", groupId = "products-service-group")
//OrderEventListener.java
@KafkaListener(topics = "order-created-topic", groupId = "products-service-group")
public void handleOrderCreatedEvent(String message) {
try {
// JSON ๋ฉ์์ง๋ฅผ OrderEvent ๊ฐ์ฒด๋ก ๋ณํ
OrderEvent orderEvent = objectMapper.readValue(message, OrderEvent.class);
// ์์๊ณผ ์ฌ์ด์ฆ์ ๋ฐ๋ฅธ ์ฌ๊ณ ์กฐํ
Optional<ProductStocks> optionalStock = productStocksRepository.findStockQuantity(
orderEvent.getProductId(),
orderEvent.getColorId(),
orderEvent.getSizeId()
);
if (optionalStock.isPresent()) {
ProductStocks stock = optionalStock.get();
int updatedStock = stock.getStockQuantity() - orderEvent.getQuantity();
// ์ฌ๊ณ ๋ถ์กฑ ์ ์์ธ ์ฒ๋ฆฌ
if (updatedStock < 0) {
throw new IllegalStateException("์ฌ๊ณ ๋ถ์กฑ: ์์ฒญํ ์๋์ ์ฒ๋ฆฌํ ์ ์์ต๋๋ค.");
}
// ์ฌ๊ณ ์๋ ์
๋ฐ์ดํธ
stock.setStockQuantity(updatedStock);
productStocksRepository.save(stock);
System.out.println("์ฌ๊ณ ์
๋ฐ์ดํธ ์ฑ๊ณต");
} else {
System.out.println("ํด๋น ์์/์ฌ์ด์ฆ์ ๋ํ ์ฌ๊ณ ์ ๋ณด๊ฐ ์์ต๋๋ค.");
}
} catch (Exception e) {
System.out.println("์ฃผ๋ฌธ ์ฒ๋ฆฌ ์ค ์ค๋ฅ ๋ฐ์: " + e.getMessage());
}
}
โญ 6. ์์๋๋ฉด ์ข์ ์ค์ & ๋ช ๋ น์ด
1. ์ฃผ์ ์คํฌ๋ฆฝํธ
Kafka์ ๋ช ๋ น์ด๋ Kafka ์ค์น ๋๋ ํ ๋ฆฌ์ bin/ ํด๋์ ์๋ ์คํฌ๋ฆฝํธ๋ฅผ ์ฌ์ฉ.
- kafka-topics.sh: ํ ํฝ ์์ฑ, ์กฐํ, ์ญ์ ๋ฑ์ ๊ด๋ฆฌ.
- kafka-console-producer.sh: ๋ฉ์์ง ๋ฐํ(Producer ์ญํ ).
- kafka-console-consumer.sh: ๋ฉ์์ง ํ์ธ(Consumer ์ญํ ).
- kafka-configs.sh: ๋ธ๋ก์ปค๋ ํ ํฝ ์ค์ ๋ณ๊ฒฝ.
2. ๋ฉ์์ง ํ์ธ์ฉ ACL ๋นํ์ฑํ (๊ฐ๋ฐ/ํ ์คํธ์ฉ)
Kafka๋ ๋ณด์ ์ค์ ์ด ๊ธฐ๋ณธ ํ์ฑํ๋ ๊ฒฝ์ฐ Consumer/Producer๊ฐ ์ธ์ฆ๋์ง ์์ผ๋ฉด ์คํจํ ์ ์์ต๋๋ค. ๊ฐ๋ฐ์ฉ์ผ๋ก sasl.mechanism์ ์ค์ ํ์ง ์์ผ๋ ค๋ฉด server.properties์์ ์๋ ์ต์ ์ ํ์ธํ์ธ์:
authorizer.class.name=
3. ๋ธ๋ก์ปค ์ํ ํ์ธ
bin/kafka-broker-api-versions.sh --bootstrap-server <๋ธ๋ก์ปค์ฃผ์>
4. ๋ฉ์์ง ํ์ธ ์ JSON ํฌ๋งท ํ์ฑ
bin/kafka-console-consumer.sh --topic <ํ ํฝ๋ช > --bootstrap-server <๋ธ๋ก์ปค์ฃผ์> --from-beginning | jq
5. ๊ฐ๋ฐ ํ๊ฒฝ์์๋ Kafka ๋นํ์ฑํ
kafka.enabled=false
spring.autoconfigure.exclude=org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration
@ConditionalOnProperty(name = "kafka.enabled", havingValue = "true", matchIfMissing = true)
'Backend > spring cloud (MSA)' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
์ค์๊ฐ ๋ฐ์ดํฐ ์ ์ก (0) | 2025.02.23 |
---|---|
[Kafka] ๊ณ ๊ธ ์ค์ (0) | 2025.02.23 |
Eureka๋? (0) | 2025.02.16 |
GateWay๋? (0) | 2025.02.16 |
Spring Cloud๋? (0) | 2025.02.16 |