depoplabs/celery-message-consumer

Library doesn't use the workers pool.

Opened this issue · 0 comments

Hi I recently realized that the library doesn't use the worker pool on the code see:

https://github.com/depop/celery-message-consumer/blob/master/event_consumer/handlers.py#L344 and #18 (comment) which could lead to bugs on code relying on the worker_signals such as worker_process_init or worker_process_shutdown, this issue is only to help people replace those signals, so their code can still work, with libraries such as mongoengine(which creates a default singleton connection)


class CustomConsumerStep(AMQPRetryConsumerStep):
    """ 
        Celery works with workers, but it seems to be this consumer library works on the main thread and not on the workers.
        And because of that the signal: worker_process_init.connect is executed, but everything you create in that function is not shared with the main process.

  """

    def start(self, c):
        super().start(c)
        #Overrides worker_process_init

        connect_database()

    def stop(self, c):
        super().stop(c)
        #Overrides worker_process_shutdown
        disconnect_database()

    def shutdown(self, c):
        super().shutdown(c)
        #Overrides worker_process_shutdown
        disconnect_database()

consumer_app = Celery("consumer_amqp_tasks", broker="amqps:.....")
consumer_app.steps["consumer"].add(CustomConsumerStep)