This repository contains an implementation of a Spring Cloud Stream binder with Kafka Streams support.
The goal of the project is to allow the use of Spring Cloud Stream programming model and configuration model (binding declarations, annotations, Spring Boot configuration) options with the Kafka Streams API.
The Kafka Streams binder can be used like any regular binder, and can bind input and output `KStream`s added to a Spring Cloud Stream application.
First, you must build the project and install the artifacts in the local repository. This step will be necessary until the project will be regularly deployed in the Spring snapshot repository.
./mvnw clean install
Once the binder artifact is available it can be added to the project.
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kstream</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
For using the Kafka Streams binder, the application must use a bound interface using KStream
instances, e.g.
public interface KStreamProcessor {
@Input("input")
KStream<?, ?> input();
@Output("output")
KStream<?, ?> output();
}
As with any Spring Cloud Stream application, you can declare an arbitrary number of input and output targets, which correspond with the logical inputs and outputs of the application
Each declared input and output maps to a Kafka topic via spring.cloud.stream.bindings.<targetName>.destination
.
Bound interfaces can be enabled using the standard Spring Cloud Stream annotation to that effect.
// Other annotations omitted
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {
// main code omitted
}
Your application will typically consist of one or more @StreamListener
methods applying the Kafka Streams API to the declared inputs and, outputs, as in the following example (a simplified version of the word counting sample application).
@StreamListener("input")
@SendTo("output")
public KStream<?, String> splitStrings(KStream<?, String> input) {
return input
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")));
}
A Word Count example that can be run both inside and outside Spring Cloud Data Flow is provided as part of the project.