faust-streaming/faust

Question: Delayed retry queue implementation in Faust?

somnam opened this issue · 2 comments

Hi,

I'm trying to implement a delayed retry queue using Kafka and Faust.
The idea is that after failing to process a message it gets forwarded to an RQ topic.
Messages consumed from this topic aren't immediately reprocessed as a delay is applied first.

A more detailed description of this approach can be found here and here.
I've managed to come up with a working Aiokafka implementation but struggle to implement this pattern in Faust (which is the library used in the app I need to add the queue to).

In Aiokafka I use the pause / resume consumer api. After consuming a message I pause the partitions assigned to the consumer.
To not exceed the limit of max_poll_interval_ms a separate asyncio task gets created which handles the delay and reprocessing.
This task sleeps for the given delay time during which the context can be switched to consumer loop.
The loop doesn't return any new messages due to paused partitions but polls Kafka so max_poll_interval_ms isn't reached.
After reprocessing the message is committed and consumer partitions are re-enabled:

async def consume(topic: str, bootstrap_servers: str, group_id: str) -> None:
    consumer = AIOKafkaConsumer(
        topic,
        bootstrap_servers=bootstrap_servers,
        group_id=group_id,
        enable_auto_commit=False,  # will manually commit each message
        max_poll_interval_ms=1000,  # only 1s delay between poll required
    )
    await consumer.start()

    try:
        async for message in consumer:
            consumer.pause(*consumer.assignment())
            create_task(delayed_processing(consumer, message, 60))  # delay by 60s from message.timestamp
    finally:
        await consumer.stop()

async def delayed_processing(consumer: AIOKafkaConsumer, message: ConsumerRecord, delay: int) -> None:
    try:
        processing_delay = get_message_processing_delay(message, delay)
        await asyncio.sleep(processing_delay)
        run_processing(message)
        await consumer.commit()
    finally:
        consumer.resume(*consumer.assignment())

I think that the Faust implementation should follow the same pattern (it uses aiokafka under the hood) but I can't manage to have the stream paused / resumed after consuming a message.
When I use the provided pause_partitions and resume_partitions api methods the max_poll_interval is reached (they don't seem to touch the AIOKafka consumer in any way).
When I use the underlying AIOKafkaConsumer thread to pause the consumer in the same manner as above, the event
stream keeps on giving me new messages anyway.

Do you know of a way to implement this pattern using Faust or maybe Faust requres a different approach?

Checklist

  • I have included information about relevant versions
  • I have verified that the issue persists when using the master branch of Faust.

Versions

  • Python version: 3.10
  • Faust version: 0.10.4
  • Operating system
  • Kafka version
  • RocksDB version (if applicable)

This is a really interesting question, I haven't personally done such a thing, but I did some searching around (I asked ChatGPT as well, please bear with me, I was curious):

To implement the delayed retry queue pattern in Faust, you can use the @app.timer() decorator to schedule a task that will retry the failed messages after a certain delay.

Here's an example Faust application that implements the delayed retry queue pattern:

import faust
import asyncio
import time

app = faust.App('delayed-retry-queue', broker='kafka://localhost')

failed_topic = app.topic('failed-messages', value_type=str)
retry_topic = app.topic('retry-messages', value_type=str)

@app.agent(failed_topic)
async def retry_failed_messages(messages):
    async for message in messages:
        print(f'Failed message: {message.value()}, retrying in 10 seconds')
        await asyncio.sleep(10)
        await retry_topic.send(value=message.value())

@app.timer(interval=60)
async def process_retry_messages():
    async for message in retry_topic:
        print(f'Retrying message: {message.value()}')
        # Process the message here

In this example, the failed_topic represents the topic to which failed messages are forwarded. The retry_topic represents the topic to which messages are sent after a delay for retrying.

The retry_failed_messages agent consumes from the failed_topic and schedules a retry of the failed message after a 10-second delay by sending it to the retry_topic.

The process_retry_messages function is a timer that runs every 60 seconds and consumes from the retry_topic. When a message is consumed, it is processed and removed from the retry queue.

Note that this implementation uses the Faust timer API to schedule retries, rather than the Kafka pause/resume API used in your Aiokafka implementation. The Faust timer API allows you to schedule retries without pausing Kafka partitions or blocking the consumer loop, which can simplify the implementation.

I agree with the solution provided by the AI, hence why I'm providing it. Using the Pause/Resume API can introduce a lot of complication.

I'll let you know if I think of something better, I just don't want to leave you in the dark.

Hi @wbarnha and thanks very much for the message! I've tested the presented solution and it doesn't seem to work as intended. When the @app.timer interval kicks in there's some initial delay after which the consumer group for the app is rebalanced b/c the app consumer needs now to consume from both failed_topic and retry_topic for the loop over the retry_topic to work:

INFO [aiokafka.consumer.subscription_state._change_subscription:111] Updating subscribed topics to: frozenset({'retry-messages', 'failed-messages'})
INFO [aiokafka.consumer.group_coordinator._on_join_prepare:384] Revoking previously assigned partitions
INFO [aiokafka.consumer.group_coordinator._handle_metadata_update:56] Metadata for topic has changed from {'failed-messages': 4} to {'retry-messages': 4, 'failed-messages': 4}

After which the loop starts to consume messages and never exits as it's iterating over the retry-messages topic stream.
Please note that the delay you've provided in the initial consumer:

        print(f'Failed message: {message.value()}, retrying in 10 seconds')
        await asyncio.sleep(10)

doesn't actually meet the requirement from my original post as it still needs to be shorter than max_poll_interval_ms for the consumer group not to rebalance.
That's why in the provided aiokafka solution the partitions get paused and the delay is moved to a separate task. Kafka is still polled for messages but thanks to the paused partitions no message is being consumed. At the same time the delayed task can asynchronously wait for the delay to end and doesn't interfere with max_poll_interval_ms.
Too bad faust doesn't seem to provide the pause / resume api working in the same manner as the aiokafka one.