Walking through the Spring Stack for Apache Kafka
This demo repository goes along with the talk Walking through the Spring Stack for Apache Kafka.
To run these demo applications, you must have Kafka running on your localhost:9092
.
If you have Kafka running elsewhere, please update the properties for the bootstrap server accordingly (spring.kafka.bootstrap-servers
).
For convenience, we provide a docker-compose script that allows you to run a three-node Kafka cluster locally and make one of them available at localhost:9092
.
Running Dockerized Kafka cluster locally.
-
Checkout this repository
-
Go to the root of this repository on a command line and do:
docker-compose up.
Demo applications
This repository contains several standalone Spring Boot applications for demonstrating various features mentioned during the talk. You can just navigate to the README on each application for more details.
spring-kafka-app0 - Basic Application to Publish and Consume Text Based Records
This application creates a Kafka topic named spring-kafka-app0-demo1
with a single partition.
Then it publishes random text data to this topic using KafkaTemplate
and consumes them using a KafkaListener.
KafkaTemplate
and all its necessary components, such as the ProducerFactory,
are auto-configured through Spring Boot.
KafkaListener
relies on a ConcurrnetKafkaListenerContainerFactory
auto-configured through Boot to create its message listener container.
The application uses the Java Faker library to generate 100 random Book recommendations each time it runs. The listener consumes the records and prints them on the console.
This application also includes a basic JUnit 5-based test using EmbeddedKafkaBroker.
spring-kafka-app1 - Advanced application with two publishers and listeners with initial assignment
We have two publishers in this application, both using custom KafkaTemplate
beans.
One KafkaTemplate
uses Long
as the key type, the other uses UUID,
while both use String
as the value type.
The publishers are initiated using REST endpoints.
The application creates two Kafka topics - spring-kafka-app1-demo1
and spring-kafka-app1-demo2
both with 10 partitions.
Two listeners are provided using KafkaListener.
The first listener uses the TopicPartition
and PartitionOffset
annotations to control selective partitions and initial assignments on them.
The second listener is a variant of the first, demonstrating how wildcard patterns can be used for partitions and initial assignments.
Look at the listeners for how they override the deserializer properties instead of what is auto-configured through Spring Boot.
To see the application in action, you can run it (e.g., on an IDE) and then issue the following two CURL commands.
curl -X POST http://localhost:8080/publish/demo1
curl -X POST http://localhost:8080/publish/demo2
The first command invokes the first publisher that publishes the topic spring-kafka-app1-demo1
and the second for the topic spring-kafka-app1-demo2
.
Listeners log the data received on the console with the key, partition, and offset received.
spring-kafka-app2 - JsonSerializer and JsonDeserializer in action
In this app, we use a POJO to publish and consume the JsonSerializer
and JsonDeserializer
from Spring for Apache Kafka.
The application creates a Kafka topic - spring-kafka-app2-demo
.
It publishes a domain object Foo
into the topic and then consumes it using a KafkaListener.
Look at the application.properties
configuration file to see how the serializer/deserializer properties are configured.
spring-kafka-app3 - Seeking to specific offsets
In this application, we have a publisher and a listener.
The listener extends from AbstractConsumerSeekAware,
which implements ConsumerSeekAware.
The application creates a topic named spring-kafka-app3-demo
with a single partition.
We always seek the partition to offset 20 each time the listener starts. The publisher sends 100 records each time when the app starts.
spring-kafka-app4 - Error Handling
This application demonstrates how container error handling works in Spring for Apache Kafka.
The application has a publisher, which is invoked through a REST endpoint.
Each time the REST endpoint is called, the publisher sends ten records to the topic spring-kafka-app4-demo
.
On the consumer side, we have a contrived logic for throwing an exception each time the offset is > 0 and divisible by 10.
By default, when the exception happens, the DefaultErrorHandler
will kick in with ten maximum tries without any backoff.
However, we provide a custom bean declaration for DefaultErrorHandler,
which the boot autoconfiguration picks up and configure with the container factory.
This custom handler will re-try for a maximum of 3 times with a two seconds backoff each time.
The handler also is configured with a DeadLetterPublishingRecoverer
, which sends the record in error to a default DLT topic.
Next time the app runs, you will notice that the first record (with offset % 10 == 0
) tries to recover three times and then is sent to the DLT.
There is also a convenient KafkaListener
method for DLT where it logs the DLT messages.
To exercise the app, run it first and then issue the following CURL command.
curl -X POST http://localhost:8080/publish/demo
spring-kafka-app5 - ErrorHandlingDeserializer
This application demonstrates the ErrorHandlingDeserializer
in action.
We have a publisher that sends data as String,
but the consumer expects an Integer,
thus it causes a deserialization error.
Look at the application.properties
file to see how the ErrorHandlingDeserializer
and the actual delegate types are used.
The application creates a topic named spring-kafka-app5-demo
.
When the application starts, it sends a text to the topic, which the consumer receives and runs into a deserialization exception.
Since deserialization exceptions are not retried by default, the DefaultErrorHandler
will not retry the records but will try to recover them.
We configure the error handler with a DeadLetterPublishingRecoverer
for the records in error to be sent.
The application provides a convenient KafkaListener
to consume from the DLT topic (spring-kafka-app5-demo.DLT
).
spring-kafka-app6 - Request-Reply using ReplyingKafkaTemplate
This application shows how the request-reply semantics work in Spring for Apache Kafka using the ReplyingKafkaTemplate.
There are two packages given - app6.request and app6.reply.
The applications rely on two Kafka topics - kRequests
and kReplies.
Using the ReplyingKafkaTemplate
, the request application sends a message to the topic kRequests
and waits for the reply by blocking the reply future.
The ReplyingKafkaTemplate
is configured with a reply container that specifies the reply topic (kReplies
) to wait on.
On the replies side, another application with a KafkaListener
that listens on the topic where the requests are sent (kRequests
) is provided.
Once the listener receives data, it converts the text to uppercase and then echoes it back to the reply topic.
When doing so, it also adds the proper CORRELATION_ID
it received as part of the request so that the listener on the request side (in ReplyingKafkaTemplate
) can properly discover it.
To run the app, do the following.
-
Start the reply app -
SpringKafkaApp6Reply
-
Then start the request app -
SpringKafkaApp6Request
-
Go to a terminal and issue the following command.
curl -X POST -H "Content-Type: text/plain" --data "hello request-reply in Spring for Apache Kafka" http://localhost:8080/publish/demo
You should see the received text on the request application’s console output.
spring-kafka-app7 - Non-blocking Retries
In this application, we see how non-blocking retries work. Two records are sent, with values "one" and "two"; the listener throws an exception when it receives "one." This record then goes through the non-blocking retry chain until finally ending up in the dead letter topic. We can see that the "two" record is successfully processed before "one" is retried.
To run the app, start SpringKafkaApp7
.
On the console, you should see the record delivery attempts with "one" being retried after 2 seconds, then 3, then 4.5, then 6.75, then to the DLT.
spring-integration-kafka-app - Spring Integration Kafka Outbound Channel Adapter and Message Driven Channel Adapter Demo
This is an application in which we demonstrate the usage of Spring Integration Kafka support.
This basic application shows the Kafka outbound channel adapter and message-driven channel adapter.
The outbound channel adapter is configured with a KafkaTemplate
auto-configured through Spring Boot.
Kafka message-driven channel adapter uses a custom message listener container created in the app.
The application runner bean sends messages to a channel called toKafka,
which the outbound adapter listens to and sends to a Kafka topic.
Message-driven channel adapter consumes from the topic and puts the messages on another message channel (fromKafka
).
The application runner in the app receives from this channel fromKafka
and prints the information on the console.
spring-cloud-stream-app1 - Basic Spring Cloud Stream App with Function Composition
This is an introductory Spring Cloud Stream application in which we demonstrate the functioning of a supplier, function and consumer.
Supplier produces the current time in milliseconds as a Long
value.
By default, the supplier in Spring Cloud Stream runs every second, and we use that default.
A function receives this supplied data and converts this to UTC-based time.
Then a consumer receives this UTC-based time and prints it on the console.
Look at the application properties to see how the functions are activated.
To run the application, run it, and you will see the time data getting logged on the console in UTC format.
spring-cloud-stream-app2 - Spring Cloud Stream/StreamBridge API demo
This application shows how non-functional style suppliers can be written for on-demand triggering using the StreamBridge
API.
The application has a REST endpoint, which will trigger the publishing of the data through the StreamBridge
API upon invoking.
StreamBridge
creates all the necessary output bindings.
This demo app also has a consumer that consumes from the Kafka topic to which the StreamBridge
is publishing.
You can just run the application and use the REST endpoint as below.
curl -X POST -H "Content-Type: text/plain" --data "StreamBridge Demo" http://localhost:8080/publish/demo
The consumer will print the published data on the topic through StreamBridge.
spring-cloud-stream-app3 - Spring Cloud Stream Kafka Streams Basic
This application shows how Spring Cloud Stream Kafka Streams binder works.
The example is based on the canonical word count application. It is written using Spring Cloud Stream binder for Kafka Streams using java.util.function.Function to represent a processor. It uses a single input and a single output. In essence, the application receives text messages from an input topic, computes word occurrence counts in a configurable time window, and reports that in an output topic.
You can just run the application first.
Kafka Streams processor is named countWords.
The application also uses the regular Kafka binder for producing data every second - the same book recommendation data we used in the other apps from the Java Faker library.
This supplier function is called provideWords,
which produces a topic called words
from which the Kafka Streams processor consumes data.
The countWords
processor writes the count information to a topic called counts
.
We have another consumer function using the regular Kafka binder that listens on this counts
topic and prints the word count information on the console.
Accessing binder health endpoint
curl localhost:8080/actuator/health | jq .
Accessing Kafka Streams metrics
curl localhost:8080/actuator/metrics | jq .
Something more specific
curl localhost:8080/actuator/metrics/kafka.stream.thread.commit.total | jq .
Visualize Kafka Streams topology
curl localhost:8080/actuator/kafkastreamstopology | jq .
curl localhost:8080/actuator/kafkastreamstopology/clicks-applicationId
curl localhost:8080/actuator/kafkastreamstopology/updates-applicationId
Popular UI tool for visualizing the topology: https://zz85.github.io/kafka-streams-viz/
Accessing all the bindings
curl localhost:8080/actuator/bindings | jq .
Stopping binding
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process-in-0
Starting binding
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/process-in-0
spring-cloud-stream-app4 - Spring Cloud Stream Kafka Streams Advanced
This advanced sample of a Spring Cloud Stream processor using Kafka Streams support shows both KStream and KTable bindings.
The following are the two applications in this sample.
-
Spring Cloud Stream-based Kafka Streams processor
-
Spring Cloud Stream producer application to generate data for the processor
Kafka Streams processor uses java.util.function.BiFunction to demonstrate two inputs and an output.
The processor consumes user region data as KTable,
and then the user clicks information as KStream.
Then it produces the clicks per region info on the outbound.
The same outbound information is stored in a state store to demonstrate the interactive query capabilities of Kafka Streams exposed as IteractiveQueryService
in Spring Cloud Stream.
The application also has a second processor to listen from the outbound topic to log the information. In addition, the application also exposes a REST endpoint, using which the user clicks data per region can be queried.
Run the SpringCloudStreamApp4Application app first, then the producer app - UserClicksRegionProducerApplication.
The producer app has two REST endpoints that allow you to publish user-region and user-click information to Kafka topics.
First, enter some data for the user region.
curl -X POST localhost:8090/user-region/alice/asia
At this point, Alice lives in Asia.
Now send some click impression data from Alice.
curl -X POST localhost:8090/user-clicks/alice/12
Watch the console of the SpringCloudStreamApp4Application
and see that the clicks per region information is logged from the test processor.
Invoke the REST endpoint to extract this same information through an interactive query.
curl localhost:8080/updates/asia | jq .
You can just enter more POST data as above and verify that you see the correct output.
Accessing binder health endpoint
curl localhost:8080/actuator/health | jq .
Accessing Kafka Streams metrics
curl localhost:8080/actuator/metrics | jq .
Something more specific
curl localhost:8080/actuator/metrics/kafka.stream.thread.commit.total | jq .
Visualize Kafka Streams topology
curl localhost:8080/actuator/kafkastreamstopology | jq .
curl localhost:8080/actuator/kafkastreamstopology/clicks-applicationId
curl localhost:8080/actuator/kafkastreamstopology/updates-applicationId
Accessing all the bindings
curl localhost:8080/actuator/bindings | jq .
Stopping binding
curl -d '{"state":"STOPPED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/clicks-in-0
Note: All bindings corresponding to this Kafka Streams application id will be stopped.
Starting binding
curl -d '{"state":"STARTED"}' -H "Content-Type: application/json" -X POST http://localhost:8080/actuator/bindings/clicks-in-0