Kafka knownledge
Kafka version: 3.1
Kafka
Apache Kafka is a streaming platform that is free and open-source.
Some features of Kafka
- High-throughput: Kafka has a built-in patriation system known as a Topic
- Fault-Tolerant: Kafka is resistant to node/machine failure within a cluster.
- Durability: As Kafka supports messages replication, so, messages are never lost. It is one of the reasons behind durability.
- Scalability: Kafka can be scaled-out, without incurring any downtime on the fly by adding additional nodes.
Zookeeper
ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.
Why is Zookeeper necessary for Apache Kafka?
Zookeeper has four primary functions
- Controller Election
- Cluster Membership: Zookeeper also maintains a list of all the brokers (ex: ISRs, ...)
- Topic Configuration: ZooKeeper maintains the configuration of all topics, including the list of existing topics, number of partitions for each topic, location of the replicas, configuration overrides for topics, preferred leader node, among other details.
- Access Control Lists: Access control lists or ACLs for all the topics
Broker
Controller
What is controller?
A controller is not too complex — it is a normal broker that simply has additional responsibility. It's responsible for managing the state of all partitions and replicas
For example:
- When the leader partition fails, the controller is responsible for selecting a new leader replica for the partition.
- When the ISR set of a partition changes, the controller is responsible for notifying all brokers to update their metadata information
- When increasing the number of partitions for a topic, the controller is responsible for the reallocation of partitions.
Controller election
When the controller goes down,
- Zookeeper informs all the brokers that the controller failed.
- All the brokers will apply to be the controller.
- The first broker who applies for this position will become the controller.
Fault tolerance
Fault tolerance in Kafka is done by copying the partition data to other brokers which are known as replicas. Its also called a replication factor.
Each broker will hold one or more partitions. And each of these partitions can either be a replica or leader for the topic. All the writes and reads to a topic go through the leader and the leader coordinates to update replicas with new data.
Leader, follower, ISRs
- Leader partition: A partition in the topic and is elected as leader. The leader partition responsible for reading/writing data
- Follower partition: A replica of leader on other brokers.
- ISRs(in-sync replica): the replicated partitions (followers) that are in sync with its leader.
Leader partition election
When the leader parition goes down:
- The Zookeeper informs the Controller.
- The controller selects one of the in-sync replicas (ISR) as the leader.
- When the broker comes back up, then it will be assigned again as the leader.
- Ref: https://www.confluent.io/blog/hands-free-kafka-replication-a-lesson-in-operational-simplicity/#:~:text=KAFKA%20REPLICATION:%200%20TO%2060%20IN%201%20MINUTE&text=Every%20topic%20partition%20in%20Kafka,in%20the%20presence%20of%20failures.
- Ref: https://medium.com/@anchan.ashwithabg95/fault-tolerance-in-apache-kafka-d1f0444260cf
Quorum
Formula
Quorum can be defined with a formula.
q = 2n+1
q
is the total number of nodes, and n
is the number of allowed failure nodes.
For example: if n
= 2, quorum size is 5.
Producer
Message Delivery Guarantees
Types of delivery
- At-most once: Message loss is possible if the producer doesn’t retry on failures.
- At-least-once: There is no chance of message loss but the message can be duplicated if the producer retries when the message is already persisted.
- Exactly-once: Every message is guaranteed to be persisted in Kafka exactly once without any duplicates and data loss even where there is a broker failure or producer retry.
Exactly-Once Processing
There are two points to archive "Exactly-Once":
- Idempotent Guarantee
- Transactional Guarantee
- Ref: https://www.javacodegeeks.com/2020/05/kafka-exactly-once-semantics.html
- Ref: https://ssudan16.medium.com/exactly-once-processing-in-kafka-explained-66ecc41a8548#:~:text=Exactly%2Donce%3A%20Every%20message%20is,broker%20failure%20or%20producer%20retry.
- Ref: https://blog.clairvoyantsoft.com/unleash-kafka-producers-architecture-and-internal-working-f33cba6c43aa
- Ref: https://cwiki.apache.org/confluence/display/KAFKA/KIP-98+-+Exactly+Once+Delivery+and+Transactional+Messaging
Terminologies
- Producer ID (PID)
A Unique Identifier assigned to the producer by the broker.
If transactional.id
is not specified, a fresh PID is generated every-time on producer initialization. If transactional.id
is specified,the broker stores mapping of Transactional ID to PID so that it can return the same PID on producer restart.
- Epoch Number
The epoch number is an integer that is used alongside PID to uniquely identify the latest active producer which is only relevant if transactional.id
is set.
- Sequence Number
The producer maintains Sequence Number for every message per PID and Topic Partition combination. The boroker will reject if it receives a message whoes Sequence Number is not exactly one greater than what was stored in the broker.
- Control Message
The two types of Control Messages are COMMIT
and ABORT
.
- Transaction Coordinator
Transaction Coordinator maintains a map of transactional.id
holds the metadata includes: PID, Epoch Number, transaction timeout, last updated time of the transaction, transaction status, list of Topic Partitions
- Transaction Log
__transaction_state topic
Idempotent Guarantee
With idempotent guarantee, this ensures exactly-one only in a single producer session. Exactly-one is not guaranteed when the producer is restarted.
When the producer is restarted, it will get a new PID
(producer ID).
producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "100");
Transactional Guarantee
1. Producer
2. Consumer
Guaranteed Message Ordering
The max.in.flight.requests.per.connection
setting can be used to increase throughput by allowing the client to send multiple unacknowledged requests before blocking. However it can be is a risk of message re-ordering occurring when retrying due to errors.
Configuration
Partitioning strategies
Types of Strategies
Consumer
Partition assignment strategies
How do consumers work?
Poll and Internal Threads Behavior
Ref: https://www.conduktor.io/kafka/kafka-consumer-important-settings-poll-and-internal-threads-behavior
Some important consumers
fetch.min.bytes
fetch.max.bytes
fetch.max.wait.ms
max.partition.fetch.bytes
max.poll.records
max.poll.interval.ms
session.timeout.ms
partition.assignment.strategy
__consumer_offsets topic
Since 0.9v Kafka stores topic offsets on the broker directly instead of relying on Zookeeper.
Offsets in Kafka are stored as messages in a separate topic named __consumer_offsets
. Each consumer commits a message into the topic at periodic intervals.
The Consumer Groups are stored in the __consumer_offsets
topic. That topic contains both the committed offsets and the groups metadata (group.id, members, generation, leader, ...). Groups are stored using GroupMetadataMessage
messages (Offsets use OffsetsMessage
).
Dump the group metadata:
./bin/kafka-console-consumer.sh \
--formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" \
--bootstrap-server localhost:9092 \
--topic __consumer_offsets
Consumer group
Feature
Transaction
Why does Kafka producer produce 2 offsets per message in transaction mode?
This is a design of Kafka. When producer publish a message or a batch of messages, it adds a extra message as a commit message to complete a transaction.
For example:
- The producer publish 10 messages, and the current offset will be 11.
- The producer publish 1 message, and current offset will be 2
Data retention
Types of Cleanup Policies
- delete
- compact
- delete, compact
Setting data retention
To configure the cleanup policy, please follow the below steps:
- Choose cleanup policy
cleanup.policy
- Default: delete
- Valid Values: [compact, delete]
- Server Default Property: log.cleanup.policy
Compact policy:
Ref: https://medium.com/@sunny_81705/kafka-log-retention-and-cleanup-policies-c8d9cb7e09f8