Gracefully stopping consumers?
Opened this issue · 4 comments
I have a consumer that takes about 20 minutes to process a received message. During this period the consumer should not be stopped. If I need to deploy a new version of the consumer, for example, things become challenging.
I'd like a way to signal fedmsg to stop consuming new messages; wait for consumers to finish processing current messages; then finally stop fedmsg. Ideally a configurable max wait timeout would control this.
Before I delve into the code base, does this approach make sense?
The way we've handled this in fedora-messaging is to register signal handlers. I'd recommend that approach. Handle the signal by setting a "stop" flag, and if that flag is already set (e.g. the signal got sent twice) stop immediately.
@jeremycline, ah that's clever because I can do that on my consumer directly (I think) without having to mess with fedmsg/moksha. Thanks, I'll look into that.
The way we've handled this in fedora-messaging is to register signal handlers. I'd recommend that approach. Handle the signal by setting a "stop" flag, and if that flag is already set (e.g. the signal got sent twice) stop immediately.
I think I'm getting closer to this. I've added a signal handler like this in my consumer:
class MyConsumer(fedmsg.consumers.FedmsgConsumer):
def __init__(self, hub):
...
signal.signal(signal.SIGTERM, self._on_sigterm)
def _on_sigterm(self, signum, frame):
log.info('Received signal %d', signum)
if signum != signal.SIGTERM:
log.warning('Signal is unexpected, doing nothing')
return
# Do not consume more messages from UMB
self.hub.unsubscribe(self._consume)
# Finish up what's already in queue
while not self.incoming.empty():
# TODO: Break if configured timeout expires
time.sleep(1)
# TODO: Ensure that there are no active queue items being processed - not sure what to check
# Disconnect from hub and do some cleanup (Yikes! this is ugly)
self.stop()
self.hub.stop()
from moksha.hub.reactor import reactor
reactor.stop()
After calling reactor.stop()
it seems that any process waits for any currently running threads to finish executing before quitting which is pretty close to what I want.
The only missing piece is checking for running threads before killing hub/reactor. The reason being that I'd like to publish a message at the end of my consume method. If I'm killing hub/reactor, no messages will be published. I'll keep digging to see if there's a way.