The Spring Integration Kafka extension project provides inbound and outbound channel adapters for Apache Kafka. Apache Kafka is a distributed publish-subscribe messaging system that is designed for high throughput (terabytes of data) and low latency (milliseconds). For more information on Kafka and its design goals, see the Kafka main page.
Starting from version 2.0 version this project is a complete rewrite based on the new
spring-kafka project which uses the pure java Producer
and
Consumer
clients provided by Kafka 0.9.x.x and 0.10.x.x..
See the Spring Integration kafka Sample for a simple Spring Boot application that sends and receives messages.
In order to build the project:
./gradlew build
In order to install this into your local maven cache:
./gradlew install
Spring Integration Kafka project currently supports the following components. Please keep in mind that this is very early stage in development and do not yet fully make use of all the features that Kafka provides.
-
Outbound Channel Adapter
-
Message Driven Channel Adapter
The Outbound channel adapter is used to publish messages from a Spring Integration channel to Kafka.
The channel is defined in the application context and then wired in the application that sends messages to Kafka.
After that, sender applications can publish to Kafka via Spring Integration messages, which are internally converted to Kafka messages by the outbound channel adapter, as follows: the payload of the Spring Integration message will be used to populate the payload of the Kafka message, and (by default) the kafka_messageKey
header of the Spring
Integration message will be used to populate the key of the Kafka message.
The target topic and partition for publishing the message can be customized through the kafka_topic
and kafka_partitionId
headers, respectively.
Here’s an example for sending a message with an arbitrary payload and the String "key"
as value on the test
topic.
final MessageChannel channel = ctx.getBean("inputToKafka", MessageChannel.class);
channel.send(
MessageBuilder.withPayload(payload)
.setHeader(KafkaHeaders.MESSAGE_KEY, "key")
.setHeader(KafkaHeaders.TOPIC, "test")
.build()
);
In addition, the <int-kafka:outbound-channel-adapter>
provides the ability to extract the key, target topic, target partition and Kafka record timestamp by applying SpEL expressions on the outbound message.
To that end, it supports the mutually exclusive pairs of attributes topic
/topic-expression
, message-key
/message-key-expression
, and partition-id
/partition-id-expression
, to allow the specification of topic
,message-key
and partition-id
respectively as static values on the adapter, or to dynamically evaluate their values at runtime against the request message.
The Kafka record timestamp can be specified as a SpEL expression with an attribute name of timestamp-expression
Important
|
The KafkaHeaders interface (provided by spring-kafka ) contains constants used for interacting with headers.
The messageKey and topic default headers now require a kafka_ prefix.
When migrating from an earlier version that used the old headers, you need to specify message-key-expression="headers.messageKey" and topic-expression="headers.topic" on the <int-kafka:outbound-channel-adapter> , or simply change the headers upstream to the new headers from KafkaHeaders using a <header-enricher> or MessageBuilder .
Or, of course, configure them on the adapter using topic and message-key if you are using constant values.
|
NOTE : If the adapter is configured with a topic or message key (either with a constant or expression), those are used and the corresponding header is ignored. If you wish the header to override the configuration, you need to configure it in an expression, such as:
topic-expression="headers['topic'] != null ? headers['topic'] : 'myTopic'"
.
The adapter requires a KafkaTemplate
.
Here is an example of how the Kafka outbound channel adapter is configured with XML:
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="template"
auto-startup="false"
channel="inputToKafka"
topic="foo"
message-key-expression="'bar'"
partition-id-expression="2"
timestamp-expression="T(System).currentTimeMillis()">
</int-kafka:outbound-channel-adapter>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
Note
|
When using Kafka log compaction, a deleted key is represented by a null value.
Since messages can’t have null payloads, when you wish to send a null value to kafka, send a message with a payload of type KafkaNull .
|
The Java DSL configuration sample is like:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(embeddedKafka));
}
@Bean
public IntegrationFlow sendToKafkaFlow() {
return f -> f
.<String>split(p -> Stream.generate(() -> p).limit(101).iterator(), null)
.publishSubscribeChannel(c -> c
.subscribe(sf -> sf.handle(kafkaMessageHandler(producerFactory(), TEST_TOPIC),
e -> e.id("kafkaProducer")))
.subscribe(sf -> sf.handle(kafkaMessageHandler(producerFactory(), TEST_TOPIC3),
e -> e.id("kafkaProducer3")))
);
}
private KafkaProducerMessageHandlerSpec<Integer, String>
kafkaMessageHandler(ProducerFactory<Integer, String> producerFactory, String topic) {
return Kafka
.outboundChannelAdapter(producerFactory)
.messageKey(m -> m
.getHeaders()
.get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.partitionId(m -> 10)
.topicExpression("headers[kafka_topic] ?: '" + topic + "'")
.timestampExpression("T(System).currentTimeMillis()");
}
The KafkaMessageDrivenChannelAdapter
(<int-kafka:message-driven-channel-adapter>
) uses a spring-kafka
KafkaMessageListenerContainer
or ConcurrentListenerContainer
.
An example of xml configuration variant is shown here:
<int-kafka:message-driven-channel-adapter
id="kafkaListener"
listener-container="container1"
auto-startup="false"
phase="100"
send-timeout="5000"
channel="nullChannel"
error-channel="errorChannel" />
<bean id="container1" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
<constructor-arg name="topics" value="foo" />
</bean>
The Java DSL configuration sample is like:
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "false", embeddedKafka);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public IntegrationFlow listeningFromKafkaFlow() {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(consumerFactory(),
KafkaMessageDrivenChannelAdapter.ListenerMode.record, TEST_TOPIC)
.configureListenerContainer(c ->
c.ackMode(AbstractMessageListenerContainer.AckMode.MANUAL))
.errorChannel("errorChannel")
.retryTemplate(new RetryTemplate())
.filterInRetry(true))
.filter(Message.class, m ->
m.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY, Integer.class) < 101,
f -> f.throwExceptionOnRejection(true))
.<String, String>transform(String::toUpperCase)
.channel(c -> c.queue("listeningFromKafkaResults"))
.get();
}
See the sample mentioned above for Java @Configuration
.
Note
|
When using Kafka log compaction, a deleted key is represented by a null value.
Since messages can’t have null payloads, when such a value is received, it is represented by a payload of type KafkaNull .
|
Pull requests are welcome. Please see the contributor guidelines for details.