sevenmind/kafka_ex_gen_stage_consumer

Unacknowledged offsets can be commited

Opened this issue · 1 comments

It seems that in https://github.com/sevenmind/kafka_ex_gen_stage_consumer/blob/master/lib/kafka_ex_gen_stage_consumer.ex#L398, offset is marked to be committed if it's higher than the last offset marked to be committed.

However, in situations when Kafka messages are processed in parallel, I think that such a case could occur:

  • Last acked offset is 100
  • Process A receives Kafka message with offset 101
  • Process B receives Kafka message with offset 102
  • Process B processes the message and sends an ack (calling KafkaExGenStageConsumer.trigger_commit/2)
  • kafka_ex_gen_stage_consumer commits the offset 102
  • Process B crashes

Message with offset 101 has not been processed, but committed offset is 102.

off_broadway_kafka uses an ETS table to keep track of acknowledged offsets: https://github.com/bbalser/off_broadway_kafka/blob/master/lib/off_broadway/kafka/acknowledger.ex

This is certainly a potential issue.