Kafka knownledge

Kafka version: 3.1

Kafka

Apache Kafka is a streaming platform that is free and open-source.

Some features of Kafka
  • High-throughput: Kafka has a built-in patriation system known as a Topic
  • Fault-Tolerant: Kafka is resistant to node/machine failure within a cluster.
  • Durability: As Kafka supports messages replication, so, messages are never lost. It is one of the reasons behind durability.
  • Scalability: Kafka can be scaled-out, without incurring any downtime on the fly by adding additional nodes.

Zookeeper

ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.

Why is Zookeeper necessary for Apache Kafka?

Zookeeper has four primary functions

  1. Controller Election
  2. Cluster Membership: Zookeeper also maintains a list of all the brokers (ex: ISRs, ...)
  3. Topic Configuration: ZooKeeper maintains the configuration of all topics, including the list of existing topics, number of partitions for each topic, location of the replicas, configuration overrides for topics, preferred leader node, among other details.
  4. Access Control Lists: Access control lists or ACLs for all the topics

Broker

Controller

What is controller?

A controller is not too complex — it is a normal broker that simply has additional responsibility. It's responsible for managing the state of all partitions and replicas

For example:

  • When the leader partition fails, the controller is responsible for selecting a new leader replica for the partition.
  • When the ISR set of a partition changes, the controller is responsible for notifying all brokers to update their metadata information
  • When increasing the number of partitions for a topic, the controller is responsible for the reallocation of partitions.
Controller election

When the controller goes down,

  1. Zookeeper informs all the brokers that the controller failed.
  2. All the brokers will apply to be the controller.
  3. The first broker who applies for this position will become the controller.

Fault tolerance

Fault tolerance in Kafka is done by copying the partition data to other brokers which are known as replicas. Its also called a replication factor.

Each broker will hold one or more partitions. And each of these partitions can either be a replica or leader for the topic. All the writes and reads to a topic go through the leader and the leader coordinates to update replicas with new data.

Leader, follower, ISRs
  • Leader partition: A partition in the topic and is elected as leader. The leader partition responsible for reading/writing data
  • Follower partition: A replica of leader on other brokers.
  • ISRs(in-sync replica): the replicated partitions (followers) that are in sync with its leader.
Leader partition election

When the leader parition goes down:

  1. The Zookeeper informs the Controller.
  2. The controller selects one of the in-sync replicas (ISR) as the leader.
  3. When the broker comes back up, then it will be assigned again as the leader.

Quorum

Formula

Quorum can be defined with a formula.

q = 2n+1

q is the total number of nodes, and n is the number of allowed failure nodes.

For example: if n = 2, quorum size is 5.

Producer

Message Delivery Guarantees

Types of delivery

  1. At-most once: Message loss is possible if the producer doesn’t retry on failures.
  2. At-least-once: There is no chance of message loss but the message can be duplicated if the producer retries when the message is already persisted.
  3. Exactly-once: Every message is guaranteed to be persisted in Kafka exactly once without any duplicates and data loss even where there is a broker failure or producer retry.

Ref: https://ssudan16.medium.com/exactly-once-processing-in-kafka-explained-66ecc41a8548#:~:text=Exactly%2Donce%3A%20Every%20message%20is,broker%20failure%20or%20producer%20retry.

Exactly-Once Processing

There are two points to archive "Exactly-Once":

  1. Idempotent Guarantee
  2. Transactional Guarantee
Terminologies
  1. Producer ID (PID)

A Unique Identifier assigned to the producer by the broker.

If transactional.id is not specified, a fresh PID is generated every-time on producer initialization. If transactional.id is specified,the broker stores mapping of Transactional ID to PID so that it can return the same PID on producer restart.

  1. Epoch Number

The epoch number is an integer that is used alongside PID to uniquely identify the latest active producer which is only relevant if transactional.id is set.

  1. Sequence Number

The producer maintains Sequence Number for every message per PID and Topic Partition combination. The boroker will reject if it receives a message whoes Sequence Number is not exactly one greater than what was stored in the broker.

  1. Control Message

The two types of Control Messages are COMMIT and ABORT.

  1. Transaction Coordinator

Transaction Coordinator maintains a map of transactional.id holds the metadata includes: PID, Epoch Number, transaction timeout, last updated time of the transaction, transaction status, list of Topic Partitions

  1. Transaction Log

__transaction_state topic

Idempotent Guarantee

With idempotent guarantee, this ensures exactly-one only in a single producer session. Exactly-one is not guaranteed when the producer is restarted.
When the producer is restarted, it will get a new PID (producer ID).

producerProps.put("enable.idempotence", "true");
producerProps.put("transactional.id", "100");
Transactional Guarantee

1. Producer

2. Consumer

Guaranteed Message Ordering

The max.in.flight.requests.per.connection setting can be used to increase throughput by allowing the client to send multiple unacknowledged requests before blocking. However it can be is a risk of message re-ordering occurring when retrying due to errors.

Configuration

Partitioning strategies

Types of Strategies

Consumer

Partition assignment strategies

Types of Strategies

How do consumers work?

Poll and Internal Threads Behavior

Ref: https://www.conduktor.io/kafka/kafka-consumer-important-settings-poll-and-internal-threads-behavior

Some important consumers

fetch.min.bytes

fetch.max.bytes

fetch.max.wait.ms

max.partition.fetch.bytes

max.poll.records

max.poll.interval.ms

session.timeout.ms

partition.assignment.strategy

__consumer_offsets topic

Since 0.9v Kafka stores topic offsets on the broker directly instead of relying on Zookeeper.

Offsets in Kafka are stored as messages in a separate topic named __consumer_offsets . Each consumer commits a message into the topic at periodic intervals.

The Consumer Groups are stored in the __consumer_offsets topic. That topic contains both the committed offsets and the groups metadata (group.id, members, generation, leader, ...). Groups are stored using GroupMetadataMessage messages (Offsets use OffsetsMessage).

Dump the group metadata:

./bin/kafka-console-consumer.sh \
--formatter "kafka.coordinator.group.GroupMetadataManager\$GroupMetadataMessageFormatter" \
--bootstrap-server localhost:9092 \
--topic __consumer_offsets

Consumer group

Consumers & consumer groups

Feature

Transaction

Why does Kafka producer produce 2 offsets per message in transaction mode?

This is a design of Kafka. When producer publish a message or a batch of messages, it adds a extra message as a commit message to complete a transaction.

For example:

  • The producer publish 10 messages, and the current offset will be 11.
  • The producer publish 1 message, and current offset will be 2

Ref: https://stackoverflow.com/questions/59152915/spring-kafka-transaction-causes-producer-per-message-offset-increased-by-two#:~:text=The%20offset%20is%20increased%20by,t%20commit%20the%20consuming%20offset.&text=However%20the%20count%20of%20messages,the%20msgs%20from%20topic2%20continuously.

Data retention

Types of Cleanup Policies
  • delete
  • compact
  • delete, compact
Setting data retention

To configure the cleanup policy, please follow the below steps:

  1. Choose cleanup policy

cleanup.policy

  • Default: delete
  • Valid Values: [compact, delete]
  • Server Default Property: log.cleanup.policy

Compact policy:

Ref: https://medium.com/@sunny_81705/kafka-log-retention-and-cleanup-policies-c8d9cb7e09f8

Kafka tombstone

Ref: https://medium.com/@sunny_81705/kafka-log-retention-and-cleanup-policies-c8d9cb7e09f8