kafka-vanilla-spring-boot-demo

The main motivation of this project is to showcase how to integrate Kafka vanilla client in Spring Boot application.

Some ideas you might find in the project :

  • Dynamic loading of Producer/Consumer properties
  • Producer/Consumer metrics collection via Micrometer/Prometheus
  • Support multiple consumer threads
  • Deserialization & Processing Error Handling (has various strategies including dead letter queue)
  • Using Avro Generated classes

Start the environment

To start the environment simply run the following command

docker-compose up -d

This would start a local Kafka cluster (single node) and UI (Confluent Control Center).

Once started you can run the application by running

mvn spring-boot:run

Once started you can access swagger to publish / consume messages.

You can also open the Confluent Control Center to explore the content of topics.

Stopping the environment

To stop the environment simply run the following command

docker-compose down -v

Inspecting the consumer group

To inspect the state of the consumer group you can run the following command

docker-compose exec broker kafka-consumer-groups --bootstrap-server broker:9092 --group kafka-vanilla-spring-boot-demo --describe

This will show you if the application is running, what the current consumer offset position and potentially the lag.

Resetting offsets

WARNING: Before you running this command you need to make sure that the application is stopped.

If you need to reset offsets to beginning you can run the following command

docker-compose exec broker kafka-consumer-groups --bootstrap-server broker:9092 --group kafka-vanilla-spring-boot-demo --reset-offsets --all-topics --to-earliest --execute

Metrics collection

Metrics are collected via micrometer. You can choose the backend but this project showcase the prometheus backend.

Metrics are available at http://localhost:8080/actuator/prometheus

We might add a sample Producer/Consumer dashboard in the future.

Configuration

Most of the configuration is done via traditional application.yml file.

kafka:
  properties:
    bootstrap.servers: "localhost:29092"

    schema.registry.url: "http://localhost:8081"
    specific.avro.reader: "true"
  producer:
    key.serializer: "org.apache.kafka.common.serialization.StringSerializer"
    value.serializer: "io.confluent.kafka.serializers.KafkaAvroSerializer"
  consumer:
    key.deserializer: "org.apache.kafka.common.serialization.StringDeserializer"
    value.deserializer: "io.confluent.kafka.serializers.KafkaAvroDeserializer"
    group.id: "kafka-vanilla-spring-boot-demo"
  exceptionHandler: "LogAndFail"
  nbConsumerThreads: 1

Basically you have global properties, producer and consumer specific properties. Every property configured as global (like schema registry here) will be injected in all producers/consumers configuration.

The application accept dynamic property so you can use any of the following properties:

By-default the application is configured to use the spring boot application name as an client.id. This will ease monitoring if we have multiple instance of our application. If needed this can be overridden by specifying the client.id property on the producer or consumer config.

Error handling

The application provides dead letter queue on deserialization / processing error.

The code provide multiple implementation:

By default, the LogAndFail implementation is used. IT will encourage projects to think about error handling and picking the relevant strategy for their context.

This behavior can be configured via kafka.exceptionHandler attribute your application.yml file.

kafka:
  exceptionHandler: "LogAndContinue"

This implementation will send deserialization and processing errors in the same topic. Out of the box the topic is called <spring.application.name>-dlq but this can be configured in your application.yml file.

kafka:
  dlqName: "my-dlq"

This implementation will preserve the original :

  • headers
  • key (as byte[] as we potentially didn't succeed to deserialize it)
  • value the (as byte[] as we potentially didn't succeed to deserialize it)
  • timestamp

In addition to this it will add some useful headers :

  • dlq.error.app.name containing your spring boot application name.
  • dlq.error.timestamp containing the timestamp of the error
  • dlq.error.topic containing the source topic
  • dlq.error.partition containing the source partition
  • dlq.error.offset containing the source offset
  • dlq.error.exception.class.name containing the exception class name
  • dlq.error.exception.message containing the exception message
  • dlq.error.type containing the error type (either DESERIALIZATION_ERROR or PROCESSING_ERROR)

Configuring Multiple Consumer Thread

Number of consumer thread is controlled by kafka. nbConsumerThreads attribute your application.yml file.

kafka:
  nbConsumerThreads: 1

To support multiple thread the class containing your code must :

Behind the scene ConsumerAsyncConfiguration will create an executor service with the provided number of threads. In case of uncaught exception handler, the executor service is configured to stop the application.

Deep diving in the code

Some important pointers in the code :

How to contribute ?

Have any idea to make showcase better ? Found a bug ? Do not hesitate to report us via github issues and/or create a pull request.

Reference Documentation

For further reference, please consider the following sections: