reddit/baseplate.py

KombuMessageHandler doesn't handle ServerTimeout

Opened this issue · 1 comments

aides commented

Repro steps

  1. Setup consumer via KombuQueueConsumerFactory.new(...) like in the docs:
    https://baseplate.readthedocs.io/en/stable/api/baseplate/frameworks/queue_consumer/kombu.html

  2. Emulate a long-running operation in the handle_fn, e.g.

def process_links(
    context: RequestContext,
    body: Any,
    message: kombu.Message,
):
    sleep(11) # default server_timeout is 10 seconds, so just going over it. 
  1. Send the message to the queue.
  2. See that the ServerTimeout error is logged in the consumer
  3. Send another message to the queue

Actual

The message won't be consumed.

Expected

The message is consumed and ServerTimeout error is logged once again.

Details

What I found is that ServerTimeout is inherited from BaseException (https://github.com/reddit/baseplate.py/blob/master/baseplate/observers/timeout.py#L18) but MessageHandler handles Exception inherited messages only:
https://github.com/reddit/baseplate.py/blob/master/baseplate/frameworks/queue_consumer/kombu.py#L113

So the ServerTimeout exception is passed all the way through and it crashes the Thread the QueueConsumer is running in: https://github.com/reddit/baseplate.py/blob/master/baseplate/server/queue_consumer.py#L217

Proposal

I propose to handle ServerTimeout in MessageHandler as any other exception.

try:
...
except (Exception, ServerTimeout) as exc:
...

One thing to note, I think in your proposal, the message still wouldn't be consumed because an error requeue-s the message and in your test code, it's always going to time out.

I think your proposal makes sense, in reality a worker thread crashing should cause the entire queue consumer server to shut down (which should trigger it to restart)... but I think that logic might have the same problem. 😅

def watcher(fn: Callable) -> Callable:
"""Terminates the server (gracefully) if `fn` raises an Exception.
Used to monitor the pump and handler threads for Exceptions so we can
shut down the server if one of them exits unexpectedly.
"""
def _run_and_terminate(*a: Any, **kw: Any) -> Any:
try:
return fn(*a, **kw)
except Exception:
logger.exception("Unhandled error in pump or handler thread, terminating.")
self._terminate()
return _run_and_terminate

Maybe the thing to do is also make the watcher closure catch BaseException rather than Exception?