Walking through the Spring Stack for Apache Kafka

This is the demo repository that goes along with the talk Walking through the Spring Stack for Apache Kafka.

In order to run these demo applications, you need to have Kafka running on your localhost:9092. If you have Kafka running elsewhere, please ensure that you update the properties for bootstrap server accordingly (spring.kafka.bootstrap-servers).

For convenience, we provide a docker-compose script, that allows you to run a three-node Kafka cluster locally and make one of them available at localhost:9092.

Running Dockerized Kafka cluster locally.

  • Checkout this repository

  • Go to the root of this repository on a command line and do: docker-compose up.

Demo applications

This repository contains several standalone Spring Boot applications for demonstrating various features mentioned during the talk. Navigate to the README on each application for more details.

This application creates a Kafka topic named spring-kafka-app0-demo1 with a single partition. Then it publishes random text data to this topic using KafkaTemplate and then consumes them using a KafkaListener.

KafkaTemplate and all its necessary components such as the ProducerFactory are auto-configured through Spring Boot. KafkaListener relies on a ConcurrnetKafkaListenerContainerFactory auto-configured through Boot to create its message listener container.

The application uses the Java Faker library to generate 100 random Book recommendations each time it runs. The listener simply consumes the records and prints them on the console.

This application also includes a basic JUnit 5 based test using EmbeddedKafkaBroker.

In this application, we have two publishers both using custom KafkaTemplate beans. One KafkaTemplate uses Long as the key type and the other uses UUID, while both of them use String as value type. The publishers are initiated using REST endpoints.

Application creates two Kafka topics - spring-kafka-app1-demo1 and spring-kafka-app1-demo2 both with 10 partitions.

Two listeners are provided using KafkaListener. The first listener uses, TopicPartition and PartitionOffset annotations to control selective partitions and initial assignments on them. The second listener is a variant of the first one, demonstrating how wildcard patterns can be used for partitions and initial assignments. Take a look at the listeners for how they override the deserializer properties as opposed to what is auto-configured through Spring Boot.

In order to see the application in action, run it (for e.g. on an IDE) and then issue the following two CURL commands.

curl -X POST  http://localhost:8080/publish/demo1
curl -X POST  http://localhost:8080/publish/demo2

The first command invokes the first publisher that publishes to topic spring-kafka-app1-demo1 and the second one for the topic spring-kafka-app1-demo2. Listeners log the data received on the console with the key, partition and offset received from.

In this app, we use a POJO for publishing and consuming using the JsonSerializer and JsonDeserializer from Spring for Apache Kafka.

The application creates a Kafka topic - spring-kafka-app2-demo. It publishes a domain object Foo into the topic and then consumes it using a KafkaListener.

Take a look at the application.properties configuration file for seeing how the serializer/deserializer properties are configured.

In this application, we have a publisher and a listener. The listener extends from AbstractConsumerSeekAware which implements ConsumerSeekAware. Application creates a topic named spring-kafka-app3-demo with a single partition.

Each time the listener starts, we always seek the partition to offset 20. The publisher sends 100 records each time when the app starts.

This application demonstrates how container error handling works in Spring for Apache Kafka. The application has a publisher which is invoked through a REST endpoint. Each time the REST endpoint is called, the publisher sends 10 records to the topic spring-kafka-app4-demo.

On the consumer side, we have a contrived logic for throwing an exception for each time the offset is > 0 and divisible by 10. By default, when the exception happens, the DefaultErrorHandler will kick in with 10 maximum tries without any backoff. However, we provide a custom bean declaration for DefaultErrorHandler, which the boot autoconfiguration picks up and configure with the container factory. This custom handler will re-try for a maximum of 3 times with a two seconds backoff each time. The handler also is configured with a DeadLetterPublishingRecoverer which sends the record in error to a default DLT topic. Next time, the app runs, you will notice that the first record (with offset % 10 == 0) tries to recover for 3 times and then being sent to the DLT. There is also a convenient KafkaListener method for DLT where it logs the DLT messages.

In order to exercise the app, run it first and then issue the following CURL command.

curl -X POST  http://localhost:8080/publish/demo

This application demonstrates the ErrorHandlingDeserializer in action. We have a publisher that sends data as String, but the consumer expects an Integer, thus it causes a deserialization error. Take a look at the application.properties file to see how the ErrorHandlingDeserializer and the actual delegate types are used. The application creates a topic named spring-kafka-app5-demo. When the application starts, it sends a text to the topic, which the consumer receives and runs into a deserialization exception. Since deserialization exceptions are not retried by default, the DefaultErrorHandler will not retry the records, but simply tries to recover. We configure the error handler with a DeadLetterPublishingRecoverer for the records in error to be sent. The application provides a convenient KafkaListener to consumer from the DLT topic (spring-kafka-app5-demo.DLT).

In this application we see how the request-reply semantics works in Spring for Apache Kafka using the ReplyingKafkaTemplate. There are two packages given - app6.request and app6.reply. The applications rely on two Kafka topics - kRequests and kReplies. Using the ReplyingKafkaTemplate, the request application sends a message to the topic kRequests and wait for the reply by blocking on the reply future. The ReplyingKafkaTemplate is configured with a reply container that specifies the reply topic (kReplies) to wait on.

On the replies side, another application is provided that has a KafkaListener that listens on the topic where the requests are sent (kRequests). Once the listener receives data, the listener converts the text to uppercase and then echoes it back to the reply topic. When doing so, it also adds the proper CORRELATION_ID it received as part of the request, so that the listener on the request side (in ReplyingKafkaTemplate) can properly discover it.

In order to run the app, do the following.

  1. Start the reply app - SpringKafkaApp6Reply

  2. Then start the request app - SpringKafkaApp6Request

  3. Go to a terminal and issue the following command.

curl -X POST -H "Content-Type: text/plain" --data "hello request-reply in Spring for Apache Kafka" http://localhost:8080

On the console output on the request application, you should see the received text.

This is an application in which we demonstrate the usage of Spring Integration Kafka support. This is a basic application that shows the Kafka outbound channel adapter and message driven channel adapter. Outbound channel adapter is configured with a KafkaTemplate auto-configured through Spring Boot. Kafka message driven channel adapter uses a custom message listener container created in the app.

The application runner bean sends messages to a message channel called toKafka which the outbound adapter listens on and sends to a Kafka topic. Message driven channel adapter consumes from the topic and puts the messages on another message channel (fromKafka). The application runner in the app receives from this channel fromKafka and prints the information on the console.

This is a basic Spring Cloud Stream application in which we demonstrate the functioning of a supplier, function and consumer. Supplier produces the current time in milliseconds as a Long value. By default, the supplier in Spring Cloud Stream runs every second, and we use that default. There is a function that receives this supplied data and converts this to UTC based time. Then there is a consumer that receives this UTC based time and prints it on the console. Take a look at the application.properties to see how the functions are activated.

In order to run the application, simply run it, and you will see the data getting logged on the console as UTC time.

In this application, we show how non-functional style suppliers can be written for on-demand triggering using the StreamBridge API. The application has a REST endpoint, which upon invoking wil trigger the publishing of the data through the StreamBridge API. StreamBridge creates all the necessary output bindings. This demo app also has a consumer that consumes from the Kafka topic where the StreamBridge is publishing to.

Run the application and invoke the REST endpoint as below.

curl -X POST -H "Content-Type: text/plain" --data "StreamBridge Demo" http://localhost:8080/publish/demo

The consumer will print the data that is published to the topic through StreamBridge.

In this application, we see how Spring Cloud Stream Kafka Streams binder works.

The example is based on the canonical word count application. It is written using Spring Cloud Stream binder for Kafka Streams using java.util.function.Function to represent a processor. It uses a single input and a single output. In essence, the application receives text messages from an input topic and computes word occurrence counts in a configurable time window and report that in an output topic.

Run the application first.

Kafka Streams processor is named as countWords. The application also uses the regular Kafka binder which for producing data every second - the same book recommendation data we were using in the other apps from the Java faker library. This supplier function is called provideWords which is producing to a topic called words from where the Kafka Streams processor consumes data from. The processor countWords processor writes the count information to a topic called counts. We have another consumer function using the regular Kafka binder that simply listens on this counts topic and prints the word count information on the console.

Accessing binder health endpoint

curl localhost:8080/actuator/health | jq .

Accessing Kafka Streams metrics

curl localhost:8080/actuator/metrics | jq .

Something more specific

curl localhost:8080/actuator/metrics/kafka.stream.thread.commit.total | jq .

Visualize Kafka Streams topology

curl localhost:8080/actuator/kafkastreamstopology | jq .
curl localhost:8080/actuator/kafkastreamstopology/clicks-applicationId
curl localhost:8080/actuator/kafkastreamstopology/updates-applicationId

Popular UI tool for visualizing the topology: https://zz85.github.io/kafka-streams-viz/

Accessing all the bindings

curl localhost:8080/actuator/bindings | jq .

Stopping binding

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process-in-0

Starting binding

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process-in-0

This is an advanced sample of a Spring Cloud Stream processor using Kafka Streams support that shows both KStream and KTable bindings.

Following are the two applications in this sample.

  • Spring Cloud Stream based Kafka Streams processor

  • Spring Cloud Stream producer application to generate data for the processor

Kafka Streams processor uses java.util.function.BiFunction to demonstrate two inputs and an output. The processor consumes user region data as KTable and then user clicks information as KStream. Then it produces the clicks per region info on the outbound. The same outbound information is stored in a state store as well to demonstrate interactive query capabilities of Kafka Streams exposed as IteractiveQueryService in Spring Cloud Stream.

The application also has a second processor to listen from the outbound topic to log the information. In addition, the application also exposes a REST endpoint, using which the user clicks data per region can be queried.

Run the app SpringCloudStreamApp4Application first, then the producer app - UserClicksRegionProducerApplication.

The producer app has two REST endpoints that allow you to publish user-region and user-click information to Kafka topics.

First enter some data for user region.

curl -X POST localhost:8090/user-region/alice/asia

At this point, Alice lives in Asia.

Now send some click impression data from Alice.

curl -X POST localhost:8090/user-clicks/alice/12

Watch the console of the SpringCloudStreamApp4Application and see that the clicks per region information is logged from the test processor.

Invoke the REST endpoint to extract this same information through an interactive query.

curl localhost:8080/updates/asia | jq .

Enter more POST data as above and verify that you see the correct output.

Accessing binder health endpoint

curl localhost:8080/actuator/health | jq .

Accessing Kafka Streams metrics

curl localhost:8080/actuator/metrics | jq .

Something more specific

curl localhost:8080/actuator/metrics/kafka.stream.thread.commit.total | jq .

Visualize Kafka Streams topology

curl localhost:8080/actuator/kafkastreamstopology | jq .
curl localhost:8080/actuator/kafkastreamstopology/clicks-applicationId
curl localhost:8080/actuator/kafkastreamstopology/updates-applicationId

Accessing all the bindings

curl localhost:8080/actuator/bindings | jq .

Stopping binding

curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/clicks-in-0

Note: All bindings corresponding to this Kafka Streams application id will be stopped.

Starting binding

curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/clicks-in-0