
A sample kafka consumer in micronaut with avro and schema registry

Micronaut Kafka Consumer with Avro and Schema Registry





start Kafka and other components

docker compose -d up

This uses a docker compose file and starts confluent platform This will start kafka brokers, zookeeper, connect, schema registry etc.

Create a topic for avro messages

docker-compose exec broker kafka-topics --create --topic user-topic-avro-new --bootstrap-server broker:9092 --replication-factor 1 --partitions 1

This will create a topic for user data.

start micronaut application

./gradle run

Send avro records

docker-compose exec schema-registry bash
kafka-avro-console-producer --topic user-topic-avro-new --bootstrap-server broker:29092 --property value.schema="$(< /opt/app/schema/new-user.avsc)"

Sample avro records

{"table": "GG.MYUSER", "op_type": "I", "primary_keys": ["USER_ID"], "USER_ID": 1, "NAME": "Sujit", "AGE": 22, "CITY": "PUNE"}
{"table": "GG.MYUSER", "op_type": "I", "primary_keys": ["USER_ID"], "USER_ID": 2, "NAME": "Will", "AGE": 32, "CITY": "Mumbai"}

For JSon consumer

Create a topic for json messages

docker-compose exec broker kafka-topics --create --topic user-topic-json --bootstrap-server broker:9092 --replication-factor 1 --partitions 1

Send json records

docker-compose exec broker kafka-console-producer  --topic user-topic-json --bootstrap-server broker:9092

Sample json records

{"table": "GG.MYUSER", "op_type": "I", "primary_keys": ["USER_ID"], "USER_ID": 1, "NAME": "Sujit", "AGE": 22, "CITY": "PUNE"}
{"table": "GG.MYUSER", "op_type": "I", "primary_keys": ["USER_ID"], "USER_ID": 2, "NAME": "Will", "AGE": 32, "CITY": "Mumbai"}