bsm/sarama-cluster

How to start a consumer group consuming from the latest committed offsets?

vdobler opened this issue · 2 comments

We have a simple setup: One topic with 4 partitions and one consumer group consuming this topic.
This works fine as long as at least one consumer is running.

If we stop the whole consumer group and new messages arrive in the time until the consumer group is restarted these messages are not consumed. This is presumably exactly what #263 and #106 is about.

If we setting Config.Consumer.Offsets.Initial to sarama.OffsetNewest then consumption will start once a new message arrives (as described in the README). E.g. directly before the restart:
Partition 2: Start 0, End 580, Offset 550, Lag 30
Then one new message is produced and consumed directly from partition 2 with the offsets jumping to
Partition 2: Start 0, End 581, Offset 581, Lag 0
So we "loose" messages 551-580.

If we setting Config.Consumer.Offsets.Initial to sarama.OffsetOldest then consumption will start immediately (no new message needed) but from Start:
Partition 2: Start 0, End 580, Offset 550, Lag 30
After restarting the consumer group consumption will start from 0 and we will re-consume messages
0-550 which have been consumed and their offsets committed already.

How to consume exactly the uncommitted messages (and any new messages afterward) after starting a consumer group with all consumers being "brand new" (having never committed anything)?

Please tell me that we just overlooked something obvious and we do not need to start from sarama.OffsetNewest and check somehow against the last committed offsets.

dim commented

@vdobler sorry, but sarama-cluster has been deprecated in favour of IBM/sarama#1099 (which was merged today). Nevertheless, Config.Consumer.Offsets.Initial is only used when there is no offset stored. if you have consumed messages up to offset 550, you need to Mark them so that this offset is persisted. After restart the stored offset will be used and it should resume consuming from 551, not from 581. You can see this working correctly in the tests and the examples.