reddit/baseplate.py

Upgrades to `queue_processor` library

Closed this issue · 1 comments

I think there are some upgrades that we should make to the queue_processor library in Baseplate.

  1. Allow us to process multiple messages concurrently. We can spawn multiple threads (that will be using gevent under the hood due to monkey patching) that will pull work off of an internal work queue that will be "pumped" by the KombuConsumer thread. We should make this configurable so things that need strict, in-order processing can just use 1 but others can tweak the number to find what makes sense for their service.
  2. Handle termination requests gracefully. We can either implement queue consumers as a "server" and let baseplate handle the signal handling in the same way that we do for http/thrift services or assume that it will be run as a "script" and build them into the "run" function. We should listen for the termination signals and:
  • Shut down the pump thread
  • Signal to the worker threads that we are stopping so they do not pick up additional messages from the work queue and stop (side note, when implementing the worker threads, it's a good idea to set a timeout on queue.get() so you can periodically check for this shutdown signal while waiting for work).
  • Wait for all the worker threads to return

All taken care of in f7e8824...5b8054e