reddit/baseplate.py

Handling retries in KombuQueueConsumer

Closed this issue · 4 comments

aides commented

Background

I'm using KombuQueueConsumerFactory to setup a consumer for a queue(AMPQ) like this:

    return KombuQueueConsumerFactory.new(
        baseplate=baseplate,
        exchange=exchange,
        connection=connection,
        queue_name=...,
        routing_keys=[...],
        handler_fn=handler_fn,
        health_check_fn=is_worker_healthy,
    )

If handler_fn doesn't throw exception the messaged will be ack'ed, if it throws exception the message will be rejected with requeue=True.

Problem

I want to limit number of times the message is requeued to some configurable one. Right now there is no way to do that.

Workaround

The workaround I did is to write handler_fn in such way it handles this:

def worker_handler_wrapper(context: RequestContext, body: Any, message: kombu.Message) -> None:
    try:
        ...
    except Exception:
        # message.headers should be always defined as per Message __init__
        max_retries = message.headers.get(WORKER_MAX_RETRIES_HEADER, 0)

        if max_retries > 0:
            max_retries -= 1
            <publish again with WORKER_MAX_RETRIES_HEADER=max_retries>
        else:
            # logs / metrics that the message was dropped

And yes in this case we I have to publish messages like this:

message_body = {...}
headers = {WORKER_MAX_RETRIES_HEADER: max_retries}
context.amqp.publish(message_body, routing_key=routing_key, headers=headers)

I don't have specific proposed solution in mind yet, but I wanted to first understand whether framework code should handle that or maybe it should at least allow more control over what to do when message handling throws exception (e.g. doing just reject without requeue so it gets to DLX - I assume right now I can achieve that if I implement my own child of KombuQueueConsumerFactory and KombuMessageHandler.

So I'm looking for more understanding what would be the approach for handling retries for KombuQueueConsumer.

aides commented

Found this PR #413 which relates to suggestions here.

Yes, error_handler_fn should give you what you need. You do have to write the code to publish + ack rather than requeue.

There are some things to be careful with that, which is why we don't do it by default in baseplate:

  1. You have a higher chance of either losing messages or duplicating them (depending on the order you do the publish/ack) so you'll want to account for that.
  2. With this, you want to be careful that you don't edit the message during your queue processing since you'll be re-serializing it, you'll be publishing a potentially different message than you got originally when you got it from the queue.

error_handler_fn was added in baseplate 1.3 so you should be able to use it if you update to the latest version.

aides commented

@pacejackson thank you for additional details. Yeah, I agree, I think we can resolve this one.