rabbitmq/rabbitmq-stream-go-client

Consumer offset for stream with single message not stored

Darthmineboy opened this issue · 5 comments

Describe the bug

When I have a stream with only 1 message and create a single active consumer for that stream, calling consumerContext.Consumer.StoreOffset(), when consuming the message, does not store the offset. When I restart the single active consumer, the consumer will start from the beginning as the offset is not stored, thus the first and only message is replayed.

If I have 2 or more messages in the stream, then the offset is stored (for the 2nd message) and the single active consumer will start where it left off (offset + 1). So this works as expected.

I believe this issue is caused by the fact that the first message in the stream has an offset of 0 and ints are initialized with 0. There is some logic in this client to only store the offset when it has increased.

In updateLastStoredOffset() there is this check if consumer.lastStoredOffset < consumer.currentOffset. Here I think lastStoredOffset is by default 0 and currentOffset is 0, so updateLastStoredOffset() returns false and thus internalStoreOffset() does not store the offset on the RabbitMQ server.

Reproduction steps

  1. Create stream
  2. Publish one message to stream
  3. Setup single active consumer, consume the message and store the offset
  4. Restart the consumer, the consumer will start from offset 0 because the offset is not stored
    ...

Expected behavior

Consumer offset should be able to be stored even after processing only the first message in the stream

Additional context

No response

@Darthmineboy Thank you for reporting the issue. Will have a look as soon as possible.

@Darthmineboy Can you try #307 ? thank you

@Gsantomaggio that fixes the issue! Thanks

closed via #307