Why does the StreamListener consume only one record?
Danden1 opened this issue · 2 comments
I discovered that there is an option called batch size.
StreamMessageListenerContainerOptions<String, ObjectRecord<String, Long>> containerOptions = StreamMessageListenerContainerOptions
.builder().batchSize(3).pollTimeout(Duration.ofMillis(100)).targetType(Long.class).build();
So, I expected that using this option would fetch all the data at once.
However, onMessage method cannot take a list as a parameter.
class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {
@Override
//here
public void onMessage(MapRecord<String, String, String> message) {
System.out.println("MessageId: " + message.getId());
System.out.println("Stream: " + message.getStream());
System.out.println("Body: " + message.getValue());
}
}
It reads the data in batch size at once, but uses a for loop to deliver the data one by one.
class StreamPollTask<K, V extends Record<K, ?>> implements Task {
....
private void deserializeAndEmitRecords(List<ByteRecord> records) {
for (ByteRecord raw : records) {
try {
pollState.updateReadOffset(raw.getId().getValue());
V record = convertRecord(raw);
listener.onMessage(record);
} catch (RuntimeException ex) {
if (cancelSubscriptionOnError.test(ex)) {
cancel();
errorHandler.handleError(ex);
return;
}
errorHandler.handleError(ex);
}
}
}
}
I'm curious why it doesn't provide the data as a list at once, but instead delivers it this way.
(KafkaListener allow list, https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html#batch-listeners)
This is because we do not provide a batch listener abstraction and in the current approach, we sequentially handle each record regardless of whether batch reading is used. Feel free to submit a pull request that introduces a batch listener concept. StreamListener would need to be retrofitted in terms of its generics.
Thank you for your response, I'll give it a try.