micronaut-projects/micronaut-kafka

Kafka Listener committing whole batch on shutdown

PonzaMatteo opened this issue · 4 comments

Expected Behavior

Offset for batches/records not processed should not be committed on application shutdown if the batches/records have not been successfully processed using offset OffsetStrategy.SYNC/OffsetStrategy.SYNC_PER_RECORD.

Actual Behaviour

When the application shutdown while processing a batch of Kafka messages the offset for the entire batch is committed.

This causes potential data loss since the messages not processed will be skipped entirely and I believe it is an unexpected behavior when using the offset strategies OffsetStrategy.SYNC/OffsetStrategy.SYNC_PER_RECORD.

Steps To Reproduce

  • create kafka topic
  • add several messages to that topic
  • start up kafka consumer application
  • shutdown the application before all the messages have been processed
  • verify that the offset committed is for a whole batch and some messages have been skipped

Environment Information

  • micronaut kafa: v4.4.1

I believe the issue is with the logic:

  } catch (WakeupException e) {
      try {
          if (!failed && consumerState.offsetStrategy != OffsetStrategy.DISABLED) {
              kafkaConsumer.commitSync();
          }

Not sure if in that part we should commit to the offset, given that there is no guarantee that those messages have been processed.

Example Application

No response

Version

micronaut kafa: v4.4.1

Would you like to send a PR?

Might be resolved with #601

Would you like to send a PR?

I'll take a look in the coming days. My approach would probably be something like:

  • initialize failed to true, so by default it won't commit the whole batch of it didn't finished processing.

I tried to address the issue, here's the PR

I see that it has been added to the new release.

Not sure if it can be helpful but here I added a sample app that can help to re-produce the issue outside the test environment: