KombuMessageHandler doesn't handle ServerTimeout
Opened this issue · 1 comments
Repro steps
-
Setup consumer via
KombuQueueConsumerFactory.new(...)
like in the docs:
https://baseplate.readthedocs.io/en/stable/api/baseplate/frameworks/queue_consumer/kombu.html -
Emulate a long-running operation in the
handle_fn
, e.g.
def process_links(
context: RequestContext,
body: Any,
message: kombu.Message,
):
sleep(11) # default server_timeout is 10 seconds, so just going over it.
- Send the message to the queue.
- See that the ServerTimeout error is logged in the consumer
- Send another message to the queue
Actual
The message won't be consumed.
Expected
The message is consumed and ServerTimeout error is logged once again.
Details
What I found is that ServerTimeout is inherited from BaseException (https://github.com/reddit/baseplate.py/blob/master/baseplate/observers/timeout.py#L18) but MessageHandler handles Exception inherited messages only:
https://github.com/reddit/baseplate.py/blob/master/baseplate/frameworks/queue_consumer/kombu.py#L113
So the ServerTimeout exception is passed all the way through and it crashes the Thread the QueueConsumer is running in: https://github.com/reddit/baseplate.py/blob/master/baseplate/server/queue_consumer.py#L217
Proposal
I propose to handle ServerTimeout in MessageHandler as any other exception.
try:
...
except (Exception, ServerTimeout) as exc:
...
One thing to note, I think in your proposal, the message still wouldn't be consumed because an error requeue
-s the message and in your test code, it's always going to time out.
I think your proposal makes sense, in reality a worker thread crashing should cause the entire queue consumer server to shut down (which should trigger it to restart)... but I think that logic might have the same problem. 😅
baseplate.py/baseplate/server/queue_consumer.py
Lines 200 to 214 in e557576
Maybe the thing to do is also make the watcher
closure catch BaseException
rather than Exception
?