What is this?

This repository contains an implementation of a Spring Cloud Stream binder with Kafka Streams support.

The goal of the project is to allow the use of Spring Cloud Stream programming model and configuration model (binding declarations, annotations, Spring Boot configuration) options with the Kafka Streams API.

How does it work?

The Kafka Streams binder can be used like any regular binder, and can bind input and output `KStream`s added to a Spring Cloud Stream application.

Adding the binder to your application

First, you must build the project and install the artifacts in the local repository. This step will be necessary until the project will be regularly deployed in the Spring snapshot repository.

./mvnw clean install

Once the binder artifact is available it can be added to the project.

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kstream</artifactId>
    <version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>

Bound interfaces

For using the Kafka Streams binder, the application must use a bound interface using KStream instances, e.g.

public interface KStreamProcessor {

	@Input("input")
	KStream<?, ?> input();

	@Output("output")
	KStream<?, ?> output();
}

As with any Spring Cloud Stream application, you can declare an arbitrary number of input and output targets, which correspond with the logical inputs and outputs of the application Each declared input and output maps to a Kafka topic via spring.cloud.stream.bindings.<targetName>.destination.

Enabling bindings

Bound interfaces can be enabled using the standard Spring Cloud Stream annotation to that effect.

// Other annotations omitted
@EnableBinding(KStreamProcessor.class)
public class WordCountProcessorApplication {
  // main code omitted
}

Writing your application

Your application will typically consist of one or more @StreamListener methods applying the Kafka Streams API to the declared inputs and, outputs, as in the following example (a simplified version of the word counting sample application).

@StreamListener("input")
@SendTo("output")
public KStream<?, String> splitStrings(KStream<?, String> input) {
  return input
      .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")));
}

Examples

A Word Count example that can be run both inside and outside Spring Cloud Data Flow is provided as part of the project.