This example code shows what happens if a transactional producer and another non-transactional producer write to the same topic.
A consumer with isolation.level=read_committed
will just show all commited messages from the transactions and additionally all non-transactional messages.
This example also shows how Kafka Streams will handle this situation (exactly the same) and how how kStreams handles message headers. These are by default forwarded in the most simple case.
DISCLAIMER: This project is for demonstration purposes only. Using the concept in production is highly discouraged. Use at your own risk.
Change to folder cluster
.
Start the containers by running:
docker-compose up -d
Stopping the containers:
docker-compose down
Cleaning up (CAREFUL: THIS WILL DELETE ALL UNUSED VOLUMES):
docker volumes prune
Initialize by running
gradle wrapper
Build Jar including all libraries with:
./gradlew shadowJar
Change to the "clients" folder. Then use the following commands to run the code:
./gradlew -p producer-transactional run
Alternatively, run the combined jar like this:
java -jar producer-transactional/build/libs/producer-transactional-all.jar producer-transactional.properties
Running an implementation consisting of a transactional producer interlieved with a non-transactional producer
Change to the "clients" folder. Then use the following commands to run the code:
./gradlew -p consumer-transactionalchaos run
However, the expected chaos does not happen. Still, mixing transactional and non-transacational messages in one topic is highly discouraged.
Change to the "clients" folder. Then use the following commands to run the code:
./gradlew -p consumer-transactional run
Change to the "clients" folder. Then use the following commands to run the code:
./gradlew -p kstreams-header-forward run
This will forward all commited transactional messages and non-transactional messages from topic-1
to topic-2
.
There are three different, very simple topologies in the implementation which have in common that they will retain the original context including the headers send with the transactional messages (in this example, this would work for non-transactional messages, too, but here we do not add headers to those).
Consume from the topic like this, including the headers of the messages:
kafka-console-consumer --bootstrap-server localhost:9092 \
--from-beginning \
--property print.headers=true \
--topic topic-1
Note that the above command will also show uncommited messages. Thus, if you want to see only commited messages, use the following command instead:
kafka-console-consumer --bootstrap-server localhost:9092 \
--from-beginning \
--isolation-level=read_committed \
--property print.headers=true \
--topic topic-1
You can delete the auto-created topic like this:
kafka-topics --bootstrap-server localhost:9092 --delete --topic topic-1
If mixing transactional and non-transactional messages in a single topic, consumers will still see all messages by default. With isolation.level=read_committed
they will just see all commited messages from the transactions and additionally all non-transactional messages. The messages are filtered on the consumer side, but this happens inside of the Kafka client library and is not exposed to the customer application. Particularly, it is not possible on the application level to distinguish between commited transactional messages and non-transactional messages.