Spring Integration Kafka Adapter

INTEXT KAFKA

The Spring Integration Kafka extension project provides inbound and outbound channel adapters for Apache Kafka. Apache Kafka is a distributed publish-subscribe messaging system that is designed for high throughput (terabytes of data) and low latency (milliseconds). For more information on Kafka and its design goals, see the Kafka main page.

Starting from version 2.0 version this project is a complete rewrite based on the new spring-kafka project which uses the pure java Producer and Consumer clients provided by Kafka 0.9.x.x and 0.10.x.x..

Quick Start

See the Spring Integration kafka Sample for a simple Spring Boot application that sends and receives messages.

Checking out and building

In order to build the project:

./gradlew build

In order to install this into your local maven cache:

./gradlew install

Spring Integration Kafka project currently supports the following components. Please keep in mind that this is very early stage in development and do not yet fully make use of all the features that Kafka provides.

  • Outbound Channel Adapter

  • Message Driven Channel Adapter

Outbound Channel Adapter:

The Outbound channel adapter is used to publish messages from a Spring Integration channel to Kafka. The channel is defined in the application context and then wired in the application that sends messages to Kafka. After that, sender applications can publish to Kafka via Spring Integration messages, which are internally converted to Kafka messages by the outbound channel adapter, as follows: the payload of the Spring Integration message will be used to populate the payload of the Kafka message, and (by default) the kafka_messageKey header of the Spring Integration message will be used to populate the key of the Kafka message.

The target topic and partition for publishing the message can be customized through the kafka_topic and kafka_partitionId headers, respectively.

Here’s an example for sending a message with an arbitrary payload and the String "key" as value on the test topic.

    final MessageChannel channel = ctx.getBean("inputToKafka", MessageChannel.class);

    channel.send(
            MessageBuilder.withPayload(payload)
                    .setHeader(KafkaHeaders.MESSAGE_KEY, "key")
                    .setHeader(KafkaHeaders.TOPIC, "test")
					.build()
            );

In addition, the <int-kafka:outbound-channel-adapter> provides the ability to extract the key, target topic, target partition and Kafka record timestamp by applying SpEL expressions on the outbound message. To that end, it supports the mutually exclusive pairs of attributes topic/topic-expression, message-key/message-key-expression, and partition-id/partition-id-expression, to allow the specification of topic,message-key and partition-id respectively as static values on the adapter, or to dynamically evaluate their values at runtime against the request message. The Kafka record timestamp can be specified as a SpEL expression with an attribute name of timestamp-expression

Important
The KafkaHeaders interface (provided by spring-kafka) contains constants used for interacting with headers. The messageKey and topic default headers now require a kafka_ prefix. When migrating from an earlier version that used the old headers, you need to specify message-key-expression="headers.messageKey" and topic-expression="headers.topic" on the <int-kafka:outbound-channel-adapter>, or simply change the headers upstream to the new headers from KafkaHeaders using a <header-enricher> or MessageBuilder. Or, of course, configure them on the adapter using topic and message-key if you are using constant values.

NOTE : If the adapter is configured with a topic or message key (either with a constant or expression), those are used and the corresponding header is ignored. If you wish the header to override the configuration, you need to configure it in an expression, such as:

topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'".

The adapter requires a KafkaTemplate.

Here is an example of how the Kafka outbound channel adapter is configured with XML:

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="template"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    topic="foo"
                                    message-key-expression="'bar'"
                                    partition-id-expression="2"
                                    timestamp-expression="T(System).currentTimeMillis()">
</int-kafka:outbound-channel-adapter>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>
Note
When using Kafka log compaction, a deleted key is represented by a null value. Since messages can’t have null payloads, when you wish to send a null value to kafka, send a message with a payload of type KafkaNull.

The Java DSL configuration sample is like:

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}

@Bean
public IntegrationFlow sendToKafkaFlow() {
    return f -> f
        .<String>split(p -> Stream.generate(() -> p).limit(101).iterator(), null)
        .publishSubscribeChannel(c -> c
            .subscribe(sf -> sf.handle(kafkaMessageHandler(producerFactory(), TEST_TOPIC),
                    e -> e.id("kafkaProducer")))
            .subscribe(sf -> sf.handle(kafkaMessageHandler(producerFactory(), TEST_TOPIC3),
                    e -> e.id("kafkaProducer3")))
        );
}

private KafkaProducerMessageHandlerSpec<Integer, String>
kafkaMessageHandler(ProducerFactory<Integer, String> producerFactory, String topic) {
    return Kafka
        .outboundChannelAdapter(producerFactory)
        .messageKey(m -> m
                .getHeaders()
                .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
        .partitionId(m -> 10)
        .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
        .timestampExpression("T(System).currentTimeMillis()");
}

Message Driven Channel Adapter:

The KafkaMessageDrivenChannelAdapter (<int-kafka:message-driven-channel-adapter>) uses a spring-kafka KafkaMessageListenerContainer or ConcurrentListenerContainer.

An example of xml configuration variant is shown here:

<int-kafka:message-driven-channel-adapter
        id="kafkaListener"
        listener-container="container1"
        auto-startup="false"
        phase="100"
        send-timeout="5000"
        channel="nullChannel"
        error-channel="errorChannel" />

<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
            <constructor-arg>
                <map>
                <entry key="bootstrap.servers" value="localhost:9092" />
                ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
    <constructor-arg name="topics" value="foo" />
</bean>

The Java DSL configuration sample is like:

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
    Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "false", embeddedKafka);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public IntegrationFlow listeningFromKafkaFlow() {
    return IntegrationFlows
            .from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
                    KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC)
                    .configureListenerContainer(c ->
                            c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL))
                    .errorChannel("errorChannel")
                    .retryTemplate(new RetryTemplate())
                    .filterInRetry(true))
            .filter(Message.class, m ->
                           m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
                   f -> f.throwExceptionOnRejection(true))
            .<String, String>transform(String::toUpperCase)
            .channel(c -> c.queue("listeningFromKafkaResults"))
            .get();
}

See the sample mentioned above for Java @Configuration.

Note
When using Kafka log compaction, a deleted key is represented by a null value. Since messages can’t have null payloads, when such a value is received, it is represented by a payload of type KafkaNull.

Contributing

Pull requests are welcome. Please see the contributor guidelines for details.