spring-projects/spring-data-redis

Why does the StreamListener consume only one record?

Danden1 opened this issue · 2 comments

I discovered that there is an option called batch size.

StreamMessageListenerContainerOptions<String, ObjectRecord<String, Long>> containerOptions = StreamMessageListenerContainerOptions
				.builder().batchSize(3).pollTimeout(Duration.ofMillis(100)).targetType(Long.class).build();

So, I expected that using this option would fetch all the data at once.

However, onMessage method cannot take a list as a parameter.

class ExampleStreamListener implements StreamListener<String, MapRecord<String, String, String>> {

	@Override
        //here
	public void onMessage(MapRecord<String, String, String> message) {

		System.out.println("MessageId: " + message.getId());
		System.out.println("Stream: " + message.getStream());
		System.out.println("Body: " + message.getValue());
	}
}

It reads the data in batch size at once, but uses a for loop to deliver the data one by one.

class StreamPollTask<K, V extends Record<K, ?>> implements Task {
....
    private void deserializeAndEmitRecords(List<ByteRecord> records) {
    
        for (ByteRecord raw : records) {
    
	    try {
    
                pollState.updateReadOffset(raw.getId().getValue());
                V record = convertRecord(raw);
                listener.onMessage(record);
	    } catch (RuntimeException ex) {
    
                if (cancelSubscriptionOnError.test(ex)) {
    
	            cancel();
	            errorHandler.handleError(ex);
            
	            return;
                }
            
                errorHandler.handleError(ex);
            }
        }
    }

}

I'm curious why it doesn't provide the data as a list at once, but instead delivers it this way.

(KafkaListener allow list, https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html#batch-listeners)

This is because we do not provide a batch listener abstraction and in the current approach, we sequentially handle each record regardless of whether batch reading is used. Feel free to submit a pull request that introduces a batch listener concept. StreamListener would need to be retrofitted in terms of its generics.

Thank you for your response, I'll give it a try.