A race condition in recovery causes unrecoverable gaps in acked offsets and consequently blocks committing
cristianmatache opened this issue · 0 comments
Checklist
- I have included information about relevant versions
- I have verified that the issue persists when using the
master
branch of Faust.
Steps to reproduce
Starting from a high-level description of the issue. To illustrate the race condition, we need to look first at some key design aspects:
- Upon restart/rebalance Faust replays messages from the changelog topics (without consuming messages from the input topics)
- Messages are consumed from Kafka asynchronously populating some in-memory queues. Agents will consume from these queues via
async for ... in stream
to process the messages. - Once the recovery from changelog topics is completed, Faust listens to the input topics (i.e., non-changelog topics), clearing the contents of all in-memory queues.
- After agents process a message, the message becomes "acked". Faust maintains the offsets of all acked messages and asynchronously commits these acked offsets back to Kafka.
- Faust has a consistency check that requires contiguous acked offsets to commit a new offset. That is, if the committed offset is N, and the acked offsets are N+1, N+2, N+3, Faust can safely commit offset N+3. However, if the committed offset is N, and the acked offsets are N+2, N+3, Faust will wait until N+1 is acked.
With the current setup, some message(s) may never arrive because the queues are cleaned up in flight, thus blocking committing.
- After the recovery is completed, the async flow of messages from non-changelog topics is resumed:
faust/faust/tables/recovery.py
Lines 635 to 637 in 6588a97
- The
Consumer
reads messages asynchronously and puts them in theConductor
queue via the callbackfaust/faust/transport/consumer.py
Line 1201 in 6588a97
- The
- Then we seek the offsets - this is a network call that may take some time, time during which the queues may be populated
faust/faust/tables/recovery.py
Lines 640 to 642 in 6588a97
- Then we signal the resume:
faust/faust/tables/recovery.py
Line 645 in 6588a97
- This clears the in-memory queues, if the queues are not empty that means those messages will never get acked: https://github.com/faust-streaming/mode/blob/c0aa58181432402dca30aa5978179383863a185a/mode/utils/queues.py#L78-L89
Messages never being acked cause the committing task to hang because there is no contiguous sequence of offsets.
faust/faust/transport/consumer.py
Line 976 in 6588a97
faust/faust/transport/consumer.py
Line 994 in 6588a97
- Computing the new offset to commit fails
faust/faust/transport/consumer.py
Lines 1120 to 1121 in 6588a97
If there are any messages in flight in the queue, they will be cleaned up before being acked.
I think simply moving
faust/faust/tables/recovery.py
Lines 635 to 637 in 6588a97
after
faust/faust/tables/recovery.py
Lines 640 to 642 in 6588a97
will solve the issue.