The goal of the POC is to test the RetryableTopics feature of spring-boot
It works like this:
- a new line is written in a product database
- have Debezium emit the ID of the new object to kafka
- a consumer takes this ID
- the consumer makes a REST GET request on the product service
- it does a stdout print of the full message if successful
- it register it the retryable topics if not successful for further processing
learn more about spring-boot kafka retryable topics here and here
(deprecated) we also use the confluent schema-registry for usage with Avro, and KSQLDB as QOL tool.
This poc is partially based on this ksqldb tutorial
-
docker-compose up -d --build
this launches the services -
setup debezium replication
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d @debezium/products-dz-config.json
-
(deprecated) login into ksqldb-cli container and create the kafka stream (you must wait about one minute before doing this)
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
and copy-paste the SQL script from./scripts/create_stream.sql
int the ksql shell
simply open IntelliJ, import the pom.xml and run RetryApplication::main
from there
(deprecated) first genereate the SerDes classes with the Avro schema : mvn generate-sources
run ./scripts/create_events.py
The python server will raise a 500 error every even event. This is very rough but useful to see what happens with the retryable topics.
To check what happens, you can use the kafka command line utilities to:
- list the topics
# example
kafka-topics.sh --bootstrap-server localhost:29092 --list
- check what happens in a topic
# example
kafka-console-consumer.sh --bootstrap-server localhost:29092 --topic products.public.product-retry-0 --from-beginning
https://github.com/52North/postgis-kafka-cdc/blob/master/postgis-debezium/Dockerfile
https://debezium.io/blog/2020/09/15/debezium-auto-create-topics/
https://docs.confluent.io/platform/current/schema-registry/serdes-develop/index.html
https://www.logicbig.com/tutorials/misc/kafka/kafka-manual-commit-async-example.html
offsets commit timeout https://kafka.apache.org/documentation/#brokerconfigs_offsets.commit.timeout.ms