[Kafka] κ³ κΈ μ€μ
π Kafka μ΄μ νκ²½μμ νμν κ³ κΈ μ€μ
κΈ°λ³Έ κ°λ λ§μΌλ‘λ Kafkaλ₯Ό μ¬μ©ν μλ μμ§λ§, μ€μ μ΄μ νκ²½μμλ λ λ§μ κ±Έ μμμΌ νλ€!
- ν ν½, νν°μ , λΈλ‘컀, 컨μλ¨Έ κ·Έλ£Ή, μ€νμ μ΄ν΄
- Producer & Consumer λμ λ°©μ
- κΈ°λ³Έμ μΈ λͺ λ Ήμ΄ (ν ν½ μμ±, λ©μμ§ μμ°/μλΉ, μ€νμ κ΄λ¦¬)
β‘οΈ μ΄ μ λλ§ μμλ λ‘컬 νκ²½μμ ν μ€νΈνκ³ κ°λ¨ν μλΉμ€μ μ μ©ν μ μλ€!! π
π 1. νν°μ ν λΉ μ λ΅ (Partition Assignment Strategy)
β 컨μλ¨Έ κ·Έλ£Ή μ΅μ ν
Kafkaλ μ¬λ¬ κ°μ 컨μλ¨Έκ° λμΌν ν ν½μ μλΉν λ, νν°μ μ 컨μλ¨Έλ€μκ² μλμΌλ‘ ν λΉνλ€. μ΄λ νν°μ ν λΉ μ λ΅μ΄ μ€μν μν μ νλ€.
β‘οΈ μ΄μ νκ²½μμλ 컨μλ¨Έ κ·Έλ£Ήμ λΆν λΆμ°μ μ΅μ ννκΈ° μν΄ μν©μ λ§λ ν λΉ μ λ΅μ μ νν΄μΌ νλ€.
β μ£Όμ ν λΉ μ λ΅
- RangeAssignor (κΈ°λ³Έκ°): νν°μ μ μ°μμ μΈ λ²μλ‘ λλμ΄ ν λΉ. 컨μλ¨Έ μμ νν°μ μκ° κ· λ±νμ§ μμΌλ©΄ μΌλΆ 컨μλ¨Έκ° λ λ§μ νν°μ μ κ°μ Έκ° μ μμ.
- RoundRobinAssignor: λͺ¨λ 컨μλ¨Έμκ² **μν λ°©μ(λΌμ΄λλ‘λΉ)**μΌλ‘ κ³¨κ³ λ£¨ ν λΉ. 컨μλ¨Έ κ° κ· λ±ν λΆλ°°κ° νμν κ²½μ° μ ν©.
- StickyAssignor: νν°μ μ¬ν λΉ μ κΈ°μ‘΄ ν λΉμ μ΅λν μ μ§νμ¬ λ¦¬λ°Έλ°μ± μ€λ²ν€λ μ΅μν.
- Custom Assignor: νΉμ μꡬμ¬νμ λ§κ² μ§μ ꡬν κ°λ₯.
β ν λΉ μ λ΅ μ€μ λ°©λ²
partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
β νν°μ μ κ°μ
β‘οΈ νν°μ & λΈλ‘컀 κ°μ κ΄κ³
- νν°μ
κ°μλ λΈλ‘컀 κ°μμ 무κ΄νκ² μ€μ κ°λ₯!
→ νμ§λ§ μ΅μ μ±λ₯μ μν΄ λΈλ‘컀 κ°μλ³΄λ€ ν¬κ±°λ κ°κ² μ€μ νλ κ² μΌλ°μ . - μ΄μ : κ° λΈλ‘μ»€κ° μ΅μν νλμ νν°μ
μ κ°μ ΈμΌ λΆνκ° λΆμ°λ¨!
→ νν°μ κ°μκ° λΈλ‘컀 κ°μλ³΄λ€ μ μΌλ©΄ μΌλΆ λΈλ‘μ»€κ° λκ² λ¨.
β‘οΈ μΆμ²: νν°μ κ°μ ≥ λΈλ‘컀 κ°μ (λ³΄ν΅ λΈλ‘컀 κ°μ × 2 μΆμ²)
π νν°μ κ°μκ° λΈλ‘컀 κ°μλ³΄λ€ μ μΌλ©΄
π λ¨λ λΈλ‘컀λ μΈμ μν μ ν κΉ?
1οΈβ£ μ₯μ λ°μ μ λκΈ° μν
- 리λ λΈλ‘μ»€κ° μ£½μΌλ©΄ λ¨μ λΈλ‘μ»€κ° λ¦¬λλ‘ μΉκ²©λ μλ μμ
- λ¨λ λΈλ‘μ»€λ‘ μλ‘μ΄ νν°μ μ΄ λ°°μΉλ μλ μμ (λΈλ‘컀 μΆκ° μ μλ μ‘°μ )
2οΈβ£ λ€λ₯Έ ν ν½μ νν°μ μ μ μ₯ν μλ μμ
- μ΄μ νκ²½μμλ λ³΄ν΅ μ¬λ¬ κ°μ ν ν½μ΄ μμ
- νΉμ ν ν½μλ μν μ΄ μλλΌλ λ€λ₯Έ ν ν½μ 리λ λλ νλ‘μλ‘ μ°μΌ μ μμ
π 2. 리ν리μΌμ΄μ & μ₯μ λλΉ (Replication & Fault Tolerance)
β Kafkaλ λ°μ΄ν° μ μ€μ λ°©μ§νκΈ° μν΄ λ¦¬ν리μΌμ΄μ (볡μ ) κΈ°λ₯μ μ 곡νλ€.
β‘οΈ μ΄μ νκ²½μμλ 리ν리μΌμ΄μ μ ν΅ν΄ λ°μ΄ν° μ μ€μ λ°©μ§νκ³ , μ₯μ λ°μ μ λΉ λ₯Έ λ³΅κ΅¬κ° κ°λ₯νλλ‘ μ€μ ν΄μΌ νλ€
β 리ν리μΌμ΄μ μ κ°μ
β‘οΈ λ¦¬ν리μΌμ΄μ & λΈλ‘컀 κ°μ κ΄κ³
- μ΅λ λΈλ‘컀 κ°μλ§νΌ μ€μ κ°λ₯. (μ΄ λ λΈλ‘μ»€κ° μ₯μ λλ©΄, μλ‘κ² ν λΉν 곡κ°μ΄ μμ΄ λ³΅κ΅¬κ° μ΄λ €μ)
- λ³΄ν΅ λΈλ‘컀 κ°μ -1λ‘ μ€μ (1κ°λ 리λ, λλ¨Έμ§λ νλ‘μ).
- λͺ¨λ 볡μ λ³Έμ μλ‘ λ€λ₯Έ λΈλ‘컀μ μ μ₯λΌμΌ νλ―λ‘,
→ replication-factorλ λΈλ‘컀 κ°μλ₯Ό μ΄κ³Όν μ μμ.
β‘οΈ μΆμ²: replication-factor ≤ λΈλ‘컀 κ°μ (λ³΄ν΅ λΈλ‘컀 κ°μ - 1 μΆμ²)
π 리ν리μΌμ΄μ μ ν λΉ μ
ν ν½ A (λΈλ‘컀 4κ°, νν°μ 3κ°, 볡μ 3κ°)
Kafkaλ μλμΌλ‘ λΈλ‘컀λ₯Ό μ΅λν κ· λ±νκ² λΆλ°°
νν°μ | 리λ λΈλ‘컀 | νλ‘μ λΈλ‘컀 1 | νλ‘μ λΈλ‘컀 2 |
P0 | B1 | B2 | B3 |
P1 | B2 | B3 | B4 |
P2 | B3 | B4 | B1 |
β κ΅¬μ± μμ
- 리λ νν°μ : ν΄λ¬μ€ν°μμ νΉμ λΈλ‘μ»€κ° ν΄λΉ νν°μ μ 리λ μν μ λ΄λΉ.
- νλ‘μ νν°μ : 리λλ₯Ό 볡μ νμ¬ λ°±μ μν μ μν.
- ISR (In-Sync Replicas): 리λμ λκΈ°νλ νλ‘μ λͺ©λ‘. μ₯μ λ°μ μ μλ‘μ΄ λ¦¬λλ₯Ό μ μΆνλ λ° μ¬μ©λ¨.
β μλ λ°©μ
- κ° νν°μ μ νλμ 리λ(Leader)μ νλ μ΄μμ νλ‘μ(Follower)λ‘ κ΅¬μ±λ¨.
- νλ‘λμ(Producer)λ νμ 리λ νν°μ μΌλ‘ λ°μ΄ν°λ₯Ό 보λ.
- νλ‘μ νν°μ λ€μ 리λμ λ°μ΄ν°λ₯Ό λκΈ°ν(볡μ )νλ©΄μ λκΈ°.
β μ₯μ λ°μ μ 볡ꡬ κ³Όμ
1οΈβ£ 리λ λΈλ‘컀 μ₯μ λ°μ π¨ → ISR λ΄μμ μλ‘μ΄ λ¦¬λ μλ μ μΆ.
2οΈβ£ νλ‘μκ° μΌμ μκ° λ¦¬λμ λκΈ°νλμ§ μμΌλ©΄ ISRμμ μ κ±°λ¨.
3οΈβ£ 볡ꡬλ λΈλ‘컀λ λ€μ ISRμ μΆκ°λ¨.
β μ€μ λ°©λ²
min.insync.replicas=2 # λλ ν ν½ μμ± μ replication-factor=2 # μ΅μν 2κ°μ 볡μ λ³Έμ΄ μμ΄μΌ μ»€λ° κ°λ₯
unclean.leader.election.enable=false # μμλ λ°μ΄ν°κ° 리λλ‘ μ μΆλλ κ²μ λ°©μ§
π 3. λ ν
μ
μ μ±
& μ±λ₯ νλ (Retention Policy & Performance Tuning)
β Kafkaλ μ€λλ λ°μ΄ν°λ₯Ό μλμΌλ‘ μμ νκ±°λ μ μ§νλ λ ν μ μ μ± μ μ§μνλ€.
β‘οΈ μ΄μ νκ²½μμλ λ ν μ μ€μ μ ν΅ν΄ μ μ₯ 곡κ°μ κ΄λ¦¬νκ³ , μ±λ₯ νλμ ν΅ν΄ μ²λ¦¬ μλλ₯Ό μ΅μ νν΄μΌ νλ€.
β λ ν μ μ μ± μ€μ λ°©λ²
log.retention.ms=604800000 # 7μΌκ° λ°μ΄ν° 보κ΄
log.retention.bytes=1073741824 # 1GB μ΄κ³Ό μ μμ
β μ±λ₯ νλ μμ
- λ°°μΉ ν¬κΈ° μ‘°μ : batch.size κ° μ‘°μ λ‘ νλ‘λμμ μ±λ₯ ν₯μ.
- μμΆ μ¬μ©: compression.type=lz4 μ€μ μΌλ‘ λ°μ΄ν° μ μ‘λ μ κ°.
- λ©μμ§ ν¬κΈ° μ ν: message.max.bytes μ€μ μΌλ‘ κ°λ³ λ©μμ§ ν¬κΈ° μ‘°μ .
π 4. Kafka Connect & Schema Registry (μΈλΆ μμ€ν μ°λ)
β Kafka Connectλ₯Ό μ¬μ©νλ©΄ λ€λ₯Έ μμ€ν κ³Ό μμ½κ² λ°μ΄ν°λ₯Ό μ°λν μ μλ€.
β‘οΈ μ΄μ νκ²½μμλ Kafka Connectλ₯Ό νμ©νμ¬ λ€μν μμ€ν κ³Ό μ°λνκ³ , Schema Registryλ‘ λ°μ΄ν° μΌκ΄μ±μ μ μ§ν΄μΌ νλ€.
β Kafka Connectμ μν
- DB, ν΄λΌμ°λ, NoSQL, νμΌ μμ€ν λ± λ€μν μμ€μμ Kafkaλ‘ λ°μ΄ν°λ₯Ό κ°μ Έμ€κ±°λ μ μ‘ κ°λ₯.
- Sink Connector: Kafka → μΈλΆ μμ€ν (μ: Elasticsearch, HDFS, RDBMS)
- Source Connector: μΈλΆ μμ€ν → Kafka (μ: MySQL, PostgreSQL, MongoDB)
β Schema Registry νμ©
- Kafka λ©μμ§μ JSON, Avro, Protobuf κ°μ λ°μ΄ν° νμμ κ΄λ¦¬.
- μ€ν€λ§ λ³κ²½ μ λ°μ΄ν° 무결μ±μ μ μ§νκ³ , λ°μ΄ν° μμ€μ λ°©μ§.
- REST APIλ₯Ό ν΅ν΄ μ€ν€λ§λ₯Ό μ‘°ννκ³ κ΄λ¦¬ κ°λ₯.
π 5. Kafka Streams & KSQL (μ€μκ° λ°μ΄ν° μ²λ¦¬)
β
Kafka Streams: Kafka λ°μ΄ν°λ₯Ό μ€μκ°μΌλ‘ μ²λ¦¬νλ κ°λ ₯ν μ€νΈλ¦¬λ° λΌμ΄λΈλ¬λ¦¬.
β
KSQL: SQL λ¬Έλ²μ μ΄μ©ν΄ μ€μκ° λ°μ΄ν° μ²λ¦¬λ₯Ό μ½κ² ꡬνν μ μλ€.
β‘οΈ μ΄μ νκ²½μμλ Kafka Streams λλ KSQLμ νμ©νμ¬ μ€μκ° λ°μ΄ν° μ²λ¦¬λ₯Ό μ΅μ νν μ μλ€.
β Kafka Streams μ£Όμ κ°λ
- Stateful & Stateless μ°μ° μ§μ
- Windowing μ²λ¦¬ κ°λ₯ (μκ° κΈ°λ° λ°μ΄ν° μ²λ¦¬)
- Exactly-once μ²λ¦¬ κ°λ₯ (μ€λ³΅ λ©μμ§ λ°©μ§)
β Kafka Streams μμ
KStream<String, String> stream = builder.stream("input-topic");
stream.filter((key, value) -> value.contains("important"))
.to("output-topic");
β KSQL μ£Όμ κ°λ
- Kafka ν ν½μ SQLμ²λΌ μ‘°ννκ³ νν°λ§ κ°λ₯.
- μ€μκ° λ°μ΄ν° μ€νΈλ¦Όμ μ‘°μΈ, μ§κ³, λ³ν κ°λ₯.
β KSQL μμ
SELECT * FROM transactions WHERE amount > 1000;
π 6. Kafka μ΄μ νκ²½μμμ μ€νμ μ»€λ° μ λ΅
β
Kafkaμμ μ€νμ
(Offset) μ 컨μλ¨Έκ° νΉμ ν ν½μ νν°μ
μμ λ°μ΄ν°λ₯Ό μ΄λκΉμ§ μ½μλμ§λ₯Ό λνλ΄λ λ²νΈλ€.
β‘οΈ μ΄μ νκ²½μμλ λ°μ΄ν° μ μ€ λ°©μ§, μ€λ³΅ λ°©μ§, μ νν ν λ²λ§ μ²λ¦¬ (Exactly-Once) 보μ₯μ μν΄ μ€νμ
μ»€λ° μ λ΅μ΄ μ€μν¨!
β
1. μ€νμ
μ»€λ° λ°©μ: μλ vs μλ
μ€νμ μ μΈμ , μ΄λ»κ² μ μ₯ν μ§ κ²°μ νλ μ€μ .
λ°©μ | μ€λͺ | μ₯μ | λ¨μ |
μλ μ»€λ° (Auto Commit) | Kafkaκ° auto.commit.interval.ms(κΈ°λ³Έ 5μ΄)λ§λ€ μλμΌλ‘ μ€νμ μ μ₯ | ꡬνμ΄ κ°λ¨ | λ©μμ§κ° μ²λ¦¬λκΈ° μ μ 컀λ°λ μλ μμ (λ°μ΄ν° μ μ€ κ°λ₯) |
μλ μ»€λ° (Manual Commit) | 컨μλ¨Έκ° μ§μ μ€νμ μ μ»€λ° (commitSync(), commitAsync()) | λ°μ΄ν° μ μ€ & μ€λ³΅ μ΅μν κ°λ₯ | κ°λ°μκ° μ§μ κ΄λ¦¬ν΄μΌ ν¨ |
β μ΄μ νκ²½μμλ μλ 컀λ°λ³΄λ€ μλ 컀λ°μ μ°λ κ² μμ !
β commitSync()λ μ ννμ§λ§ λλ¦¬κ³ , commitAsync()λ λΉ λ₯΄μ§λ§ μΌλΆ μ€ν¨ κ°λ₯ → λμ μ‘°ν©ν΄μ μ¬μ© κ°λ₯!
π μλ μ»€λ° λΉνμ±ν
enable.auto.commit=false
π μΌλ° Kafka 컨μλ¨Έ μμ (commitSync() μ¬μ©)
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record); // λ©μμ§ μ²λ¦¬
}
consumer.commitSync(); // μ²λ¦¬ μλ£ ν μ€νμ
컀λ°
}
β @KafkaListener μ¬μ© μ μλ 컀λ°
π Spring Kafka μμ (ack.acknowledge() μ¬μ©)
@KafkaListener(topics = "my-topic", containerFactory = "myFactory")
public void listen(String message, Acknowledgment ack) {
System.out.println("λ©μμ§ μμ : " + message);
ack.acknowledge(); // λ©μμ§ μ μ μ²λ¦¬ ν μ€νμ
컀λ°
}
π KafkaListenerContainerFactoryμμ ackMode λ³κ²½
μ μ½λκ° μ λλ‘ λμνλ €λ©΄, 리μ€λ 컨ν μ΄λμ ackModeλ₯Ό MANUALλ‘ μ€μ ν΄μΌ νλ€.
@Configuration
public class KafkaConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, DeliveryStartedEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, DeliveryStartedEvent> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, DeliveryStartedEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); // μλ μ»€λ° μ€μ
return factory;
}
}
β
2. μ€νμ
μ΄κΈ°ν (Reset) - λ©μμ§ μ μ€ λ°©μ§
컨μλ¨Έκ° μ²μ μμνκ±°λ, μ€νμ μ λ³΄κ° μ¬λΌμ‘μ λ Kafkaκ° μ΄λμλΆν° μ½μμ§ κ²°μ νλ μ€μ .
μ΄μ νκ²½μμλ earliestλ₯Ό μ€μ ν΄μ λ°μ΄ν° μ μ€μ λ°©μ§νλ κ² μ’μ!
μ€μ κ° | λμ λ°©μ |
earliest | κ°μ₯ μ€λλ λ©μμ§λΆν° μ½μ (λ°μ΄ν° μ μ€ λ°©μ§) |
latest (κΈ°λ³Έκ°) | κ°μ₯ μ΅κ·Ό λ©μμ§λΆν° μ½μ (μ΄μ λ©μμ§λ 무μλ¨) |
π μ€νμ μ΄κΈ°ν μ€μ
auto.offset.reset=earliest
β
3. Exactly-Once 보μ₯ (μ€λ³΅ λ°©μ§ & νΈλμμ
μ²λ¦¬)
Kafka κΈ°λ³Έ μ€μ μμλ at-least-once λ°©μμ΄λΌ μ€λ³΅ λ©μμ§ μλΉκ° λ°μν μ μλ€.
μ΄λ₯Ό λ°©μ§νλ €λ©΄ Exactly-Once μ²λ¦¬λ₯Ό μν μ€μ μ μΆκ°ν΄μΌ ν¨!
- idempotent producer (μ€λ³΅ λ°©μ§ νλ‘λμ) + transactional consumer (νΈλμμ λ 컨μλ¨Έ) λ₯Ό μ‘°ν©.
- read-process-write ν¨ν΄μμ μλΉν λ©μμ§μ μλ‘μ΄ λ©μμ§λ₯Ό νλμ νΈλμμ μΌλ‘ λ¬Άμ΄ μ»€λ°ν¨.
π νλ‘λμ μΈ‘ μ€μ (μ€λ³΅ λ°©μ§)
enable.idempotence=true
acks=all
π 컨μλ¨Έ μΈ‘ μ€μ (νΈλμμ νμ±ν)
isolation.level=read_committed
π νΈλμμ al μλΉμ + νλ‘λμ μ¬μ©
Properties props = new Properties();
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transaction-id");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
producer.beginTransaction();
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
producer.send(new ProducerRecord<>("new-topic", record.value()));
}
producer.commitTransaction(); // νλ‘λμ νΈλμμ
컀λ°
consumer.commitSync(); // 컨μλ¨Έ μ€νμ
컀λ°
}