Example for Kafka Transactions and how Kafka Streams handles headers

This example code shows what happens if a transactional producer and another non-transactional producer write to the same topic. A consumer with isolation.level=read_committed will just show all commited messages from the transactions and additionally all non-transactional messages.

This example also shows how Kafka Streams will handle this situation (exactly the same) and how how kStreams handles message headers. These are by default forwarded in the most simple case.

DISCLAIMER: This project is for demonstration purposes only. Using the concept in production is highly discouraged. Use at your own risk.

Preconditions

This project has been tested with Java version 17 and Gradle version TODO

Running Confluent Platform

Change to folder cluster.

Start the containers by running:

docker-compose up -d

Stopping the containers:

docker-compose down

Cleaning up (CAREFUL: THIS WILL DELETE ALL UNUSED VOLUMES):

docker volumes prune

Building the Producers

Initialize by running

gradle wrapper

Build Jar including all libraries with:

./gradlew shadowJar

Running the transactional producer

Change to the "clients" folder. Then use the following commands to run the code:

./gradlew -p producer-transactional run

Alternatively, run the combined jar like this:

java -jar producer-transactional/build/libs/producer-transactional-all.jar producer-transactional.properties

Running an implementation consisting of a transactional producer interlieved with a non-transactional producer

Change to the "clients" folder. Then use the following commands to run the code:

./gradlew -p consumer-transactionalchaos run

However, the expected chaos does not happen. Still, mixing transactional and non-transacational messages in one topic is highly discouraged.

Running the transactional consumer

Change to the "clients" folder. Then use the following commands to run the code:

./gradlew -p consumer-transactional run

Running the transactional Kafka streams application

Change to the "clients" folder. Then use the following commands to run the code:

./gradlew -p kstreams-header-forward run

This will forward all commited transactional messages and non-transactional messages from topic-1 to topic-2. There are three different, very simple topologies in the implementation which have in common that they will retain the original context including the headers send with the transactional messages (in this example, this would work for non-transactional messages, too, but here we do not add headers to those).

Some helpful commands

Consume from the topic like this, including the headers of the messages:

kafka-console-consumer --bootstrap-server localhost:9092 \
    --from-beginning \
    --property print.headers=true \
    --topic topic-1

Note that the above command will also show uncommited messages. Thus, if you want to see only commited messages, use the following command instead:

kafka-console-consumer --bootstrap-server localhost:9092 \
    --from-beginning \
    --isolation-level=read_committed \
    --property print.headers=true \
    --topic topic-1

You can delete the auto-created topic like this:

kafka-topics --bootstrap-server localhost:9092 --delete --topic topic-1

Results

If mixing transactional and non-transactional messages in a single topic, consumers will still see all messages by default. With isolation.level=read_committed they will just see all commited messages from the transactions and additionally all non-transactional messages. The messages are filtered on the consumer side, but this happens inside of the Kafka client library and is not exposed to the customer application. Particularly, it is not possible on the application level to distinguish between commited transactional messages and non-transactional messages.