๐ Kafka ์ด์ ํ๊ฒฝ์์ ํ์ํ ๊ณ ๊ธ ์ค์
๊ธฐ๋ณธ ๊ฐ๋ ๋ง์ผ๋ก๋ Kafka๋ฅผ ์ฌ์ฉํ ์๋ ์์ง๋ง, ์ค์ ์ด์ ํ๊ฒฝ์์๋ ๋ ๋ง์ ๊ฑธ ์์์ผ ํ๋ค!
- ํ ํฝ, ํํฐ์ , ๋ธ๋ก์ปค, ์ปจ์๋จธ ๊ทธ๋ฃน, ์คํ์ ์ดํด
- Producer & Consumer ๋์ ๋ฐฉ์
- ๊ธฐ๋ณธ์ ์ธ ๋ช ๋ น์ด (ํ ํฝ ์์ฑ, ๋ฉ์์ง ์์ฐ/์๋น, ์คํ์ ๊ด๋ฆฌ)
โก๏ธ ์ด ์ ๋๋ง ์์๋ ๋ก์ปฌ ํ๊ฒฝ์์ ํ ์คํธํ๊ณ ๊ฐ๋จํ ์๋น์ค์ ์ ์ฉํ ์ ์๋ค!! ๐
๐ 1. ํํฐ์ ํ ๋น ์ ๋ต (Partition Assignment Strategy)
โ ์ปจ์๋จธ ๊ทธ๋ฃน ์ต์ ํ
Kafka๋ ์ฌ๋ฌ ๊ฐ์ ์ปจ์๋จธ๊ฐ ๋์ผํ ํ ํฝ์ ์๋นํ ๋, ํํฐ์ ์ ์ปจ์๋จธ๋ค์๊ฒ ์๋์ผ๋ก ํ ๋นํ๋ค. ์ด๋ ํํฐ์ ํ ๋น ์ ๋ต์ด ์ค์ํ ์ญํ ์ ํ๋ค.
โก๏ธ ์ด์ ํ๊ฒฝ์์๋ ์ปจ์๋จธ ๊ทธ๋ฃน์ ๋ถํ ๋ถ์ฐ์ ์ต์ ํํ๊ธฐ ์ํด ์ํฉ์ ๋ง๋ ํ ๋น ์ ๋ต์ ์ ํํด์ผ ํ๋ค.
โ ์ฃผ์ ํ ๋น ์ ๋ต
- RangeAssignor (๊ธฐ๋ณธ๊ฐ): ํํฐ์ ์ ์ฐ์์ ์ธ ๋ฒ์๋ก ๋๋์ด ํ ๋น. ์ปจ์๋จธ ์์ ํํฐ์ ์๊ฐ ๊ท ๋ฑํ์ง ์์ผ๋ฉด ์ผ๋ถ ์ปจ์๋จธ๊ฐ ๋ ๋ง์ ํํฐ์ ์ ๊ฐ์ ธ๊ฐ ์ ์์.
- RoundRobinAssignor: ๋ชจ๋ ์ปจ์๋จธ์๊ฒ **์ํ ๋ฐฉ์(๋ผ์ด๋๋ก๋น)**์ผ๋ก ๊ณจ๊ณ ๋ฃจ ํ ๋น. ์ปจ์๋จธ ๊ฐ ๊ท ๋ฑํ ๋ถ๋ฐฐ๊ฐ ํ์ํ ๊ฒฝ์ฐ ์ ํฉ.
- StickyAssignor: ํํฐ์ ์ฌํ ๋น ์ ๊ธฐ์กด ํ ๋น์ ์ต๋ํ ์ ์งํ์ฌ ๋ฆฌ๋ฐธ๋ฐ์ฑ ์ค๋ฒํค๋ ์ต์ํ.
- Custom Assignor: ํน์ ์๊ตฌ์ฌํญ์ ๋ง๊ฒ ์ง์ ๊ตฌํ ๊ฐ๋ฅ.
โ ํ ๋น ์ ๋ต ์ค์ ๋ฐฉ๋ฒ
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
โญ ํํฐ์ ์ ๊ฐ์
โก๏ธ ํํฐ์ & ๋ธ๋ก์ปค ๊ฐ์ ๊ด๊ณ
- ํํฐ์
๊ฐ์๋ ๋ธ๋ก์ปค ๊ฐ์์ ๋ฌด๊ดํ๊ฒ ์ค์ ๊ฐ๋ฅ!
→ ํ์ง๋ง ์ต์ ์ฑ๋ฅ์ ์ํด ๋ธ๋ก์ปค ๊ฐ์๋ณด๋ค ํฌ๊ฑฐ๋ ๊ฐ๊ฒ ์ค์ ํ๋ ๊ฒ ์ผ๋ฐ์ . - ์ด์ : ๊ฐ ๋ธ๋ก์ปค๊ฐ ์ต์ํ ํ๋์ ํํฐ์
์ ๊ฐ์ ธ์ผ ๋ถํ๊ฐ ๋ถ์ฐ๋จ!
→ ํํฐ์ ๊ฐ์๊ฐ ๋ธ๋ก์ปค ๊ฐ์๋ณด๋ค ์ ์ผ๋ฉด ์ผ๋ถ ๋ธ๋ก์ปค๊ฐ ๋๊ฒ ๋จ.
โก๏ธ ์ถ์ฒ: ํํฐ์ ๊ฐ์ ≥ ๋ธ๋ก์ปค ๊ฐ์ (๋ณดํต ๋ธ๋ก์ปค ๊ฐ์ × 2 ์ถ์ฒ)
๐ ํํฐ์ ๊ฐ์๊ฐ ๋ธ๋ก์ปค ๊ฐ์๋ณด๋ค ์ ์ผ๋ฉด
๐ ๋จ๋ ๋ธ๋ก์ปค๋ ์ธ์ ์ญํ ์ ํ ๊น?
1๏ธโฃ ์ฅ์ ๋ฐ์ ์ ๋๊ธฐ ์ญํ
- ๋ฆฌ๋ ๋ธ๋ก์ปค๊ฐ ์ฃฝ์ผ๋ฉด ๋จ์ ๋ธ๋ก์ปค๊ฐ ๋ฆฌ๋๋ก ์น๊ฒฉ๋ ์๋ ์์
- ๋จ๋ ๋ธ๋ก์ปค๋ก ์๋ก์ด ํํฐ์ ์ด ๋ฐฐ์น๋ ์๋ ์์ (๋ธ๋ก์ปค ์ถ๊ฐ ์ ์๋ ์กฐ์ )
2๏ธโฃ ๋ค๋ฅธ ํ ํฝ์ ํํฐ์ ์ ์ ์ฅํ ์๋ ์์
- ์ด์ ํ๊ฒฝ์์๋ ๋ณดํต ์ฌ๋ฌ ๊ฐ์ ํ ํฝ์ด ์์
- ํน์ ํ ํฝ์๋ ์ญํ ์ด ์๋๋ผ๋ ๋ค๋ฅธ ํ ํฝ์ ๋ฆฌ๋ ๋๋ ํ๋ก์๋ก ์ฐ์ผ ์ ์์
๐ 2. ๋ฆฌํ๋ฆฌ์ผ์ด์ & ์ฅ์ ๋๋น (Replication & Fault Tolerance)
โ Kafka๋ ๋ฐ์ดํฐ ์ ์ค์ ๋ฐฉ์งํ๊ธฐ ์ํด ๋ฆฌํ๋ฆฌ์ผ์ด์ (๋ณต์ ) ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ค.
โก๏ธ ์ด์ ํ๊ฒฝ์์๋ ๋ฆฌํ๋ฆฌ์ผ์ด์ ์ ํตํด ๋ฐ์ดํฐ ์ ์ค์ ๋ฐฉ์งํ๊ณ , ์ฅ์ ๋ฐ์ ์ ๋น ๋ฅธ ๋ณต๊ตฌ๊ฐ ๊ฐ๋ฅํ๋๋ก ์ค์ ํด์ผ ํ๋ค
โญ ๋ฆฌํ๋ฆฌ์ผ์ด์ ์ ๊ฐ์
โก๏ธ ๋ฆฌํ๋ฆฌ์ผ์ด์ & ๋ธ๋ก์ปค ๊ฐ์ ๊ด๊ณ
- ์ต๋ ๋ธ๋ก์ปค ๊ฐ์๋งํผ ์ค์ ๊ฐ๋ฅ. (์ด ๋ ๋ธ๋ก์ปค๊ฐ ์ฅ์ ๋๋ฉด, ์๋กญ๊ฒ ํ ๋นํ ๊ณต๊ฐ์ด ์์ด ๋ณต๊ตฌ๊ฐ ์ด๋ ค์)
- ๋ณดํต ๋ธ๋ก์ปค ๊ฐ์ -1๋ก ์ค์ (1๊ฐ๋ ๋ฆฌ๋, ๋๋จธ์ง๋ ํ๋ก์).
- ๋ชจ๋ ๋ณต์ ๋ณธ์ ์๋ก ๋ค๋ฅธ ๋ธ๋ก์ปค์ ์ ์ฅ๋ผ์ผ ํ๋ฏ๋ก,
→ replication-factor๋ ๋ธ๋ก์ปค ๊ฐ์๋ฅผ ์ด๊ณผํ ์ ์์.
โก๏ธ ์ถ์ฒ: replication-factor ≤ ๋ธ๋ก์ปค ๊ฐ์ (๋ณดํต ๋ธ๋ก์ปค ๊ฐ์ - 1 ์ถ์ฒ)
๐ ๋ฆฌํ๋ฆฌ์ผ์ด์ ์ ํ ๋น ์
ํ ํฝ A (๋ธ๋ก์ปค 4๊ฐ, ํํฐ์ 3๊ฐ, ๋ณต์ 3๊ฐ)
Kafka๋ ์๋์ผ๋ก ๋ธ๋ก์ปค๋ฅผ ์ต๋ํ ๊ท ๋ฑํ๊ฒ ๋ถ๋ฐฐ
ํํฐ์ | ๋ฆฌ๋ ๋ธ๋ก์ปค | ํ๋ก์ ๋ธ๋ก์ปค 1 | ํ๋ก์ ๋ธ๋ก์ปค 2 |
P0 | B1 | B2 | B3 |
P1 | B2 | B3 | B4 |
P2 | B3 | B4 | B1 |
โ ๊ตฌ์ฑ ์์
- ๋ฆฌ๋ ํํฐ์ : ํด๋ฌ์คํฐ์์ ํน์ ๋ธ๋ก์ปค๊ฐ ํด๋น ํํฐ์ ์ ๋ฆฌ๋ ์ญํ ์ ๋ด๋น.
- ํ๋ก์ ํํฐ์ : ๋ฆฌ๋๋ฅผ ๋ณต์ ํ์ฌ ๋ฐฑ์ ์ญํ ์ ์ํ.
- ISR (In-Sync Replicas): ๋ฆฌ๋์ ๋๊ธฐํ๋ ํ๋ก์ ๋ชฉ๋ก. ์ฅ์ ๋ฐ์ ์ ์๋ก์ด ๋ฆฌ๋๋ฅผ ์ ์ถํ๋ ๋ฐ ์ฌ์ฉ๋จ.
โ ์๋ ๋ฐฉ์
- ๊ฐ ํํฐ์ ์ ํ๋์ ๋ฆฌ๋(Leader)์ ํ๋ ์ด์์ ํ๋ก์(Follower)๋ก ๊ตฌ์ฑ๋จ.
- ํ๋ก๋์(Producer)๋ ํญ์ ๋ฆฌ๋ ํํฐ์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ๋ณด๋.
- ํ๋ก์ ํํฐ์ ๋ค์ ๋ฆฌ๋์ ๋ฐ์ดํฐ๋ฅผ ๋๊ธฐํ(๋ณต์ )ํ๋ฉด์ ๋๊ธฐ.
โ ์ฅ์ ๋ฐ์ ์ ๋ณต๊ตฌ ๊ณผ์
1๏ธโฃ ๋ฆฌ๋ ๋ธ๋ก์ปค ์ฅ์ ๋ฐ์ ๐จ → ISR ๋ด์์ ์๋ก์ด ๋ฆฌ๋ ์๋ ์ ์ถ.
2๏ธโฃ ํ๋ก์๊ฐ ์ผ์ ์๊ฐ ๋ฆฌ๋์ ๋๊ธฐํ๋์ง ์์ผ๋ฉด ISR์์ ์ ๊ฑฐ๋จ.
3๏ธโฃ ๋ณต๊ตฌ๋ ๋ธ๋ก์ปค๋ ๋ค์ ISR์ ์ถ๊ฐ๋จ.
โ ์ค์ ๋ฐฉ๋ฒ
min.insync.replicas=2 # ๋๋ ํ ํฝ ์์ฑ ์ replication-factor=2 # ์ต์ํ 2๊ฐ์ ๋ณต์ ๋ณธ์ด ์์ด์ผ ์ปค๋ฐ ๊ฐ๋ฅ
unclean.leader.election.enable=false # ์์๋ ๋ฐ์ดํฐ๊ฐ ๋ฆฌ๋๋ก ์ ์ถ๋๋ ๊ฒ์ ๋ฐฉ์ง
๐ 3. ๋ ํ
์
์ ์ฑ
& ์ฑ๋ฅ ํ๋ (Retention Policy & Performance Tuning)
โ Kafka๋ ์ค๋๋ ๋ฐ์ดํฐ๋ฅผ ์๋์ผ๋ก ์ญ์ ํ๊ฑฐ๋ ์ ์งํ๋ ๋ ํ ์ ์ ์ฑ ์ ์ง์ํ๋ค.
โก๏ธ ์ด์ ํ๊ฒฝ์์๋ ๋ ํ ์ ์ค์ ์ ํตํด ์ ์ฅ ๊ณต๊ฐ์ ๊ด๋ฆฌํ๊ณ , ์ฑ๋ฅ ํ๋์ ํตํด ์ฒ๋ฆฌ ์๋๋ฅผ ์ต์ ํํด์ผ ํ๋ค.
โ ๋ ํ ์ ์ ์ฑ ์ค์ ๋ฐฉ๋ฒ
log.retention.ms=604800000 # 7์ผ๊ฐ ๋ฐ์ดํฐ ๋ณด๊ด
log.retention.bytes=1073741824 # 1GB ์ด๊ณผ ์ ์ญ์
โ ์ฑ๋ฅ ํ๋ ์์
- ๋ฐฐ์น ํฌ๊ธฐ ์กฐ์ : batch.size ๊ฐ ์กฐ์ ๋ก ํ๋ก๋์์ ์ฑ๋ฅ ํฅ์.
- ์์ถ ์ฌ์ฉ: compression.type=lz4 ์ค์ ์ผ๋ก ๋ฐ์ดํฐ ์ ์ก๋ ์ ๊ฐ.
- ๋ฉ์์ง ํฌ๊ธฐ ์ ํ: message.max.bytes ์ค์ ์ผ๋ก ๊ฐ๋ณ ๋ฉ์์ง ํฌ๊ธฐ ์กฐ์ .
๐ 4. Kafka Connect & Schema Registry (์ธ๋ถ ์์คํ ์ฐ๋)
โ Kafka Connect๋ฅผ ์ฌ์ฉํ๋ฉด ๋ค๋ฅธ ์์คํ ๊ณผ ์์ฝ๊ฒ ๋ฐ์ดํฐ๋ฅผ ์ฐ๋ํ ์ ์๋ค.
โก๏ธ ์ด์ ํ๊ฒฝ์์๋ Kafka Connect๋ฅผ ํ์ฉํ์ฌ ๋ค์ํ ์์คํ ๊ณผ ์ฐ๋ํ๊ณ , Schema Registry๋ก ๋ฐ์ดํฐ ์ผ๊ด์ฑ์ ์ ์งํด์ผ ํ๋ค.
โ Kafka Connect์ ์ญํ
- DB, ํด๋ผ์ฐ๋, NoSQL, ํ์ผ ์์คํ ๋ฑ ๋ค์ํ ์์ค์์ Kafka๋ก ๋ฐ์ดํฐ๋ฅผ ๊ฐ์ ธ์ค๊ฑฐ๋ ์ ์ก ๊ฐ๋ฅ.
- Sink Connector: Kafka → ์ธ๋ถ ์์คํ (์: Elasticsearch, HDFS, RDBMS)
- Source Connector: ์ธ๋ถ ์์คํ → Kafka (์: MySQL, PostgreSQL, MongoDB)
โ Schema Registry ํ์ฉ
- Kafka ๋ฉ์์ง์ JSON, Avro, Protobuf ๊ฐ์ ๋ฐ์ดํฐ ํ์์ ๊ด๋ฆฌ.
- ์คํค๋ง ๋ณ๊ฒฝ ์ ๋ฐ์ดํฐ ๋ฌด๊ฒฐ์ฑ์ ์ ์งํ๊ณ , ๋ฐ์ดํฐ ์์ค์ ๋ฐฉ์ง.
- REST API๋ฅผ ํตํด ์คํค๋ง๋ฅผ ์กฐํํ๊ณ ๊ด๋ฆฌ ๊ฐ๋ฅ.
๐ 5. Kafka Streams & KSQL (์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ)
โ
Kafka Streams: Kafka ๋ฐ์ดํฐ๋ฅผ ์ค์๊ฐ์ผ๋ก ์ฒ๋ฆฌํ๋ ๊ฐ๋ ฅํ ์คํธ๋ฆฌ๋ฐ ๋ผ์ด๋ธ๋ฌ๋ฆฌ.
โ
KSQL: SQL ๋ฌธ๋ฒ์ ์ด์ฉํด ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ฝ๊ฒ ๊ตฌํํ ์ ์๋ค.
โก๏ธ ์ด์ ํ๊ฒฝ์์๋ Kafka Streams ๋๋ KSQL์ ํ์ฉํ์ฌ ์ค์๊ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ๋ฅผ ์ต์ ํํ ์ ์๋ค.
โ Kafka Streams ์ฃผ์ ๊ฐ๋
- Stateful & Stateless ์ฐ์ฐ ์ง์
- Windowing ์ฒ๋ฆฌ ๊ฐ๋ฅ (์๊ฐ ๊ธฐ๋ฐ ๋ฐ์ดํฐ ์ฒ๋ฆฌ)
- Exactly-once ์ฒ๋ฆฌ ๊ฐ๋ฅ (์ค๋ณต ๋ฉ์์ง ๋ฐฉ์ง)
โ Kafka Streams ์์
KStream<String, String> stream = builder.stream("input-topic");
stream.filter((key, value) -> value.contains("important"))
.to("output-topic");
โ KSQL ์ฃผ์ ๊ฐ๋
- Kafka ํ ํฝ์ SQL์ฒ๋ผ ์กฐํํ๊ณ ํํฐ๋ง ๊ฐ๋ฅ.
- ์ค์๊ฐ ๋ฐ์ดํฐ ์คํธ๋ฆผ์ ์กฐ์ธ, ์ง๊ณ, ๋ณํ ๊ฐ๋ฅ.
โ KSQL ์์
SELECT * FROM transactions WHERE amount > 1000;
๐ 6. Kafka ์ด์ ํ๊ฒฝ์์์ ์คํ์ ์ปค๋ฐ ์ ๋ต
โ
Kafka์์ ์คํ์
(Offset) ์ ์ปจ์๋จธ๊ฐ ํน์ ํ ํฝ์ ํํฐ์
์์ ๋ฐ์ดํฐ๋ฅผ ์ด๋๊น์ง ์ฝ์๋์ง๋ฅผ ๋ํ๋ด๋ ๋ฒํธ๋ค.
โก๏ธ ์ด์ ํ๊ฒฝ์์๋ ๋ฐ์ดํฐ ์ ์ค ๋ฐฉ์ง, ์ค๋ณต ๋ฐฉ์ง, ์ ํํ ํ ๋ฒ๋ง ์ฒ๋ฆฌ (Exactly-Once) ๋ณด์ฅ์ ์ํด ์คํ์
์ปค๋ฐ ์ ๋ต์ด ์ค์ํจ!
โ
1. ์คํ์
์ปค๋ฐ ๋ฐฉ์: ์๋ vs ์๋
์คํ์ ์ ์ธ์ , ์ด๋ป๊ฒ ์ ์ฅํ ์ง ๊ฒฐ์ ํ๋ ์ค์ .
๋ฐฉ์ | ์ค๋ช | ์ฅ์ | ๋จ์ |
์๋ ์ปค๋ฐ (Auto Commit) | Kafka๊ฐ auto.commit.interval.ms(๊ธฐ๋ณธ 5์ด)๋ง๋ค ์๋์ผ๋ก ์คํ์ ์ ์ฅ | ๊ตฌํ์ด ๊ฐ๋จ | ๋ฉ์์ง๊ฐ ์ฒ๋ฆฌ๋๊ธฐ ์ ์ ์ปค๋ฐ๋ ์๋ ์์ (๋ฐ์ดํฐ ์ ์ค ๊ฐ๋ฅ) |
์๋ ์ปค๋ฐ (Manual Commit) | ์ปจ์๋จธ๊ฐ ์ง์ ์คํ์ ์ ์ปค๋ฐ (commitSync(), commitAsync()) | ๋ฐ์ดํฐ ์ ์ค & ์ค๋ณต ์ต์ํ ๊ฐ๋ฅ | ๊ฐ๋ฐ์๊ฐ ์ง์ ๊ด๋ฆฌํด์ผ ํจ |
โ ์ด์ ํ๊ฒฝ์์๋ ์๋ ์ปค๋ฐ๋ณด๋ค ์๋ ์ปค๋ฐ์ ์ฐ๋ ๊ฒ ์์ !
โ commitSync()๋ ์ ํํ์ง๋ง ๋๋ฆฌ๊ณ , commitAsync()๋ ๋น ๋ฅด์ง๋ง ์ผ๋ถ ์คํจ ๊ฐ๋ฅ → ๋์ ์กฐํฉํด์ ์ฌ์ฉ ๊ฐ๋ฅ!
๐ ์๋ ์ปค๋ฐ ๋นํ์ฑํ
enable.auto.commit=false
๐ ์ผ๋ฐ Kafka ์ปจ์๋จธ ์์ (commitSync() ์ฌ์ฉ)
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record); // ๋ฉ์์ง ์ฒ๋ฆฌ
}
consumer.commitSync(); // ์ฒ๋ฆฌ ์๋ฃ ํ ์คํ์
์ปค๋ฐ
}
โญ @KafkaListener ์ฌ์ฉ ์ ์๋ ์ปค๋ฐ
๐ Spring Kafka ์์ (ack.acknowledge() ์ฌ์ฉ)
@KafkaListener(topics = "my-topic", containerFactory = "myFactory")
public void listen(String message, Acknowledgment ack) {
System.out.println("๋ฉ์์ง ์์ : " + message);
ack.acknowledge(); // ๋ฉ์์ง ์ ์ ์ฒ๋ฆฌ ํ ์คํ์
์ปค๋ฐ
}
๐ KafkaListenerContainerFactory์์ ackMode ๋ณ๊ฒฝ
์ ์ฝ๋๊ฐ ์ ๋๋ก ๋์ํ๋ ค๋ฉด, ๋ฆฌ์ค๋ ์ปจํ ์ด๋์ ackMode๋ฅผ MANUAL๋ก ์ค์ ํด์ผ ํ๋ค.
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DeliveryStartedEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, DeliveryStartedEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, DeliveryStartedEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // ์๋ ์ปค๋ฐ ์ค์
return factory;
}
}
โ
2. ์คํ์
์ด๊ธฐํ (Reset) - ๋ฉ์์ง ์ ์ค ๋ฐฉ์ง
์ปจ์๋จธ๊ฐ ์ฒ์ ์์ํ๊ฑฐ๋, ์คํ์ ์ ๋ณด๊ฐ ์ฌ๋ผ์ก์ ๋ Kafka๊ฐ ์ด๋์๋ถํฐ ์ฝ์์ง ๊ฒฐ์ ํ๋ ์ค์ .
์ด์ ํ๊ฒฝ์์๋ earliest๋ฅผ ์ค์ ํด์ ๋ฐ์ดํฐ ์ ์ค์ ๋ฐฉ์งํ๋ ๊ฒ ์ข์!
์ค์ ๊ฐ | ๋์ ๋ฐฉ์ |
earliest | ๊ฐ์ฅ ์ค๋๋ ๋ฉ์์ง๋ถํฐ ์ฝ์ (๋ฐ์ดํฐ ์ ์ค ๋ฐฉ์ง) |
latest (๊ธฐ๋ณธ๊ฐ) | ๊ฐ์ฅ ์ต๊ทผ ๋ฉ์์ง๋ถํฐ ์ฝ์ (์ด์ ๋ฉ์์ง๋ ๋ฌด์๋จ) |
๐ ์คํ์ ์ด๊ธฐํ ์ค์
auto.offset.reset=earliest
โ
3. Exactly-Once ๋ณด์ฅ (์ค๋ณต ๋ฐฉ์ง & ํธ๋์ญ์
์ฒ๋ฆฌ)
Kafka ๊ธฐ๋ณธ ์ค์ ์์๋ at-least-once ๋ฐฉ์์ด๋ผ ์ค๋ณต ๋ฉ์์ง ์๋น๊ฐ ๋ฐ์ํ ์ ์๋ค.
์ด๋ฅผ ๋ฐฉ์งํ๋ ค๋ฉด Exactly-Once ์ฒ๋ฆฌ๋ฅผ ์ํ ์ค์ ์ ์ถ๊ฐํด์ผ ํจ!
- idempotent producer (์ค๋ณต ๋ฐฉ์ง ํ๋ก๋์) + transactional consumer (ํธ๋์ญ์ ๋ ์ปจ์๋จธ) ๋ฅผ ์กฐํฉ.
- read-process-write ํจํด์์ ์๋นํ ๋ฉ์์ง์ ์๋ก์ด ๋ฉ์์ง๋ฅผ ํ๋์ ํธ๋์ญ์ ์ผ๋ก ๋ฌถ์ด ์ปค๋ฐํจ.
๐ ํ๋ก๋์ ์ธก ์ค์ (์ค๋ณต ๋ฐฉ์ง)
enable.idempotence=true
acks=all
๐ ์ปจ์๋จธ ์ธก ์ค์ (ํธ๋์ญ์ ํ์ฑํ)
isolation.level=read_committed
๐ ํธ๋์ญ์ al ์๋น์ + ํ๋ก๋์ ์ฌ์ฉ
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transaction-id");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
producer.beginTransaction();
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
producer.send(new ProducerRecord<>("new-topic", record.value()));
}
producer.commitTransaction(); // ํ๋ก๋์ ํธ๋์ญ์
์ปค๋ฐ
consumer.commitSync(); // ์ปจ์๋จธ ์คํ์
์ปค๋ฐ
}
'Backend > spring cloud (MSA)' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
์ค์๊ฐ ๋ฐ์ดํฐ ์ ์ก (0) | 2025.02.23 |
---|---|
Kafka๋? (0) | 2025.02.23 |
Eureka๋? (0) | 2025.02.16 |
GateWay๋? (0) | 2025.02.16 |
Spring Cloud๋? (0) | 2025.02.16 |