Kafka Listener committing whole batch on shutdown
PonzaMatteo opened this issue · 4 comments
Expected Behavior
Offset for batches/records not processed should not be committed on application shutdown if the batches/records have not been successfully processed using offset OffsetStrategy.SYNC
/OffsetStrategy.SYNC_PER_RECORD
.
Actual Behaviour
When the application shutdown while processing a batch of Kafka messages the offset for the entire batch is committed.
This causes potential data loss since the messages not processed will be skipped entirely and I believe it is an unexpected behavior when using the offset strategies OffsetStrategy.SYNC
/OffsetStrategy.SYNC_PER_RECORD
.
Steps To Reproduce
- create kafka topic
- add several messages to that topic
- start up kafka consumer application
- shutdown the application before all the messages have been processed
- verify that the offset committed is for a whole batch and some messages have been skipped
Environment Information
- micronaut kafa: v4.4.1
I believe the issue is with the logic:
} catch (WakeupException e) {
try {
if (!failed && consumerState.offsetStrategy != OffsetStrategy.DISABLED) {
kafkaConsumer.commitSync();
}
Not sure if in that part we should commit to the offset, given that there is no guarantee that those messages have been processed.
Example Application
No response
Version
micronaut kafa: v4.4.1
Would you like to send a PR?
Might be resolved with #601
Would you like to send a PR?
I'll take a look in the coming days. My approach would probably be something like:
- initialize
failed
to true, so by default it won't commit the whole batch of it didn't finished processing.
I tried to address the issue, here's the PR
I see that it has been added to the new release.
Not sure if it can be helpful but here I added a sample app that can help to re-produce the issue outside the test environment: