faust-streaming/faust

A race condition in recovery causes unrecoverable gaps in acked offsets and consequently blocks committing

cristianmatache opened this issue · 0 comments

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

Starting from a high-level description of the issue. To illustrate the race condition, we need to look first at some key design aspects:

  • Upon restart/rebalance Faust replays messages from the changelog topics (without consuming messages from the input topics)
  • Messages are consumed from Kafka asynchronously populating some in-memory queues. Agents will consume from these queues via async for ... in stream to process the messages.
  • Once the recovery from changelog topics is completed, Faust listens to the input topics (i.e., non-changelog topics), clearing the contents of all in-memory queues.
  • After agents process a message, the message becomes "acked". Faust maintains the offsets of all acked messages and asynchronously commits these acked offsets back to Kafka.
    • Faust has a consistency check that requires contiguous acked offsets to commit a new offset. That is, if the committed offset is N, and the acked offsets are N+1, N+2, N+3, Faust can safely commit offset N+3. However, if the committed offset is N, and the acked offsets are N+2, N+3, Faust will wait until N+1 is acked.

With the current setup, some message(s) may never arrive because the queues are cleaned up in flight, thus blocking committing.

Messages never being acked cause the committing task to hang because there is no contiguous sequence of offsets.

If there are any messages in flight in the queue, they will be cleaned up before being acked.

I think simply moving

consumer.resume_partitions(
{tp for tp in assignment if not self._is_changelog_tp(tp)}
)

after
await self._wait(
consumer.perform_seek(), timeout=self.app.conf.broker_request_timeout
)

will solve the issue.