Improve reading of Kafka events with receive action
novarx opened this issue · 0 comments
Citrus Version
4.3.2
Problem
High Level
In the citrus-kafka module of citrusframework, it's possible to consume (recive) events from a kafka topic. The mechanism of this recive-action however is as simple as: Consume exactly one event at the current offset of the consumer-group.
When testing an application that publishes events, it's likely that lots of events get published in a short period of time. That makes it almost impossible to find the wanted event with this mechanism.
As seen in the example in the following diagram, reading latest nor earliest will potentially suffice.
Technical
To test if an Event has been published to a Kafka Topic, isn't as simple as it might seem.
It's best to choose a consumer-group different from the SUT for testing, to not interfere with the offset of the SUT's consumer-group. But usually, when a Topic is read by a new consumer-group, it starts from the earliest or latest event published. Given that a topic might already have lots of already published events, starting with earliest would be absurdly inefficient. Initializing the consumer-group with latest on the other hand will normally result in no event read at all. Because of the chronological order of a test - i.E. an action is executed, after that we'd expect an event published to Kafka.
Solution
The recive action could start consume a topic on an offset Ox = OT-n. Where n is the max timespan in which the wanted event is expected to have been published, but also as small as possible. A new consumer-group should then be initialized at offset Ox and consume until latest or until the wanted event has been found. Since the value n might be different from use-case to use-case, it should be configurable (per testcase or at least per project).
Acceptance Criteria
A Kafka event, that isn't the latest nor earliest on a given topic, can be verified in a citrus test using a single citrus-action
The offset range is configurable (preferably by time)
Further info
A Proof-of-concept is already done, where the KafkaConsumer.recieve
Method looks something like this:
@Override
public Message receive(TestContext context, long timeout) {
String topic = context.replaceDynamicContentInString(Optional.ofNullable(endpointConfiguration.getTopic())
.orElseThrow(() -> new CitrusRuntimeException("Missing Kafka topic to receive messages from - add topic to endpoint configuration")));
var partitions = consumer.partitionsFor(topic).stream()
.map(partition -> new TopicPartition(topic, partition.partition()))
.toList();
consumer.assign(partitions);
offsetConsumerOnTopicBySeconds(partitions, 5 * 60);
var messages = new ArrayList<Message>();
for (int i = 0; i < 10; i++) {
ConsumerRecords<Object, Object> records = consumer.poll(Duration.ofMillis(timeout));
if (records.isEmpty()) {
break;
}
Message received = endpointConfiguration.getMessageConverter().convertInbound(
records.iterator().next(),
endpointConfiguration,
context
);
messages.add(received);
context.onInboundMessage(received);
}
consumer.commitSync(Duration.ofMillis(endpointConfiguration.getTimeout()));
if (messages.isEmpty()) {
throw new MessageTimeoutException(timeout, topic);
}
return messages.get(0);
}
private void offsetConsumerOnTopicBySeconds(List<TopicPartition> partitions, int secondsToSubtract) {
Map<TopicPartition, Long> partitionWithTimestamp = partitions.stream().collect(toMap(
Function.identity(),
partition -> now().minusSeconds(secondsToSubtract).toEpochMilli()
));
var newOffsets = consumer.offsetsForTimes(partitionWithTimestamp);
newOffsets.forEach((partition, partitionOffset) -> {
if (partitionOffset != null) {
consumer.seek(partition, partitionOffset.offset() - 1);
} else {
consumer.seekToEnd(List.of(partition));
}
});
}