Library doesn't use the workers pool.
Opened this issue · 0 comments
victorct-pronto commented
Hi I recently realized that the library doesn't use the worker pool on the code see:
https://github.com/depop/celery-message-consumer/blob/master/event_consumer/handlers.py#L344 and #18 (comment) which could lead to bugs on code relying on the worker_signals
such as worker_process_init
or worker_process_shutdown
, this issue is only to help people replace those signals, so their code can still work, with libraries such as mongoengine
(which creates a default singleton connection)
class CustomConsumerStep(AMQPRetryConsumerStep):
"""
Celery works with workers, but it seems to be this consumer library works on the main thread and not on the workers.
And because of that the signal: worker_process_init.connect is executed, but everything you create in that function is not shared with the main process.
"""
def start(self, c):
super().start(c)
#Overrides worker_process_init
connect_database()
def stop(self, c):
super().stop(c)
#Overrides worker_process_shutdown
disconnect_database()
def shutdown(self, c):
super().shutdown(c)
#Overrides worker_process_shutdown
disconnect_database()
consumer_app = Celery("consumer_amqp_tasks", broker="amqps:.....")
consumer_app.steps["consumer"].add(CustomConsumerStep)