pma/amqp

Supervising multiple consumers

Closed this issue · 6 comments

Hi,

Our application has many consumers - one for each queue. We don't want to reopen a connection in each one (as per the example in the README) and think it'd be better to create a connection in one GenServer, then pass that in to many consumer GenServers and have them each open their own channel.

At the same time, we want to recover from failures both on the channel and the connection. Since we can't pass a conn around a supervision tree we'll have to do this manually, but in #31 you suggest not using Supervision at all because of the speedy connection retry. I think a channel failure should only require the consumer to be restarted, and a connection failure results in a new connection and a whole new set of consumers. Is there a good pattern for this problem?

pma commented

@desmondmonster I think a channel failure will only happen if the underlying connection is killed/closed.

One solution that comes to mind is:

Have a GenServer hold the single connection (let's call this the ConnectionManager) like you describe. This can implement the suggested connection retry mechanism that does not trigger a supervisor restart.

Define your consumers as separate GenServers that on start simply do a call to the ConnectionManager to request new channel. The ConnectionManager does not return the channel immediately, because it could still be trying to connect. Instead it will keep the consumer pid and do a GenServer.cast(consumer_pid, {:setup_channel, chan}). This callback can be triggered immediately if the conn is available/established or the consumer_pid can be saved in a list (in the ConnectionManager state) and triggered when the connection is finally reestablished.

In the handle_cast({:setup_channel, chan}, state) of the consumer you can do the normal exchange/queue declares, setup the bindings and start consuming.

You consumers can sit under a Supervisor and each monitors the provided channel pid. If the channel dies, the consumer handles the exit message and requests a new channel from the ConnectionManager.

Edit: One issue with the above is that if you consumer GenServer crashes, the channel would remain open. An option is to have the consumer link to (instead of monitoring) the channel. But this would just cause every consumer, channel and the connection to die if just on consumer crashes...

Another option is to have the ConnectionManager monitor the consumers (it knows about them when they request a channel) and keep a mapping between consumer and channel. If the consumer dies, the ConnectionManager can cleanup and close the channel.

@desmondmonster was that solved? Can you close the issue mate?

pma commented

@desmondmonster @rafaeljesus In this implementation of a gen_stage based interface (https://github.com/pma/wabbit/), a single connection can be reused by multiple consumers. It also handles reconnects. Can you give it a try?

@pma Yes of course, my application is going to be deployed next week, basically we have 6 consumers, once it is running on production I can add it into the ones we have less traffic for testing

@pma Are you using that in any production system of yours?

@pma @rafaeljesus We ended up with the following config:

A ConnectionManager genserver opens, maintains, and links to a connection. It starts and supervises the consumers, which are also genservers, who request a channel from the ConnectionManager when they start. The ConnectionManager maintains a mapping of opened channels to consumers so that if a consumer dies and restarts, it's given the same channel used by its predecessor. If a connection is unavailable during a channel request (eg it's still connecting to Rabbitmq) then the request is saved and processed once a connection becomes available. Consumers request channels asynchronously; they sit idle until they receive a channel.

The benefit of this supervision tree is that individual consumers can come and go without connection or channel churn - reconnection logic is only in the ConnectionManager. Channels are reused, if possible, and share a single connection. If either the ConnectionManager or its stored connection die, the ConnectionManager and all consumers are killed and restarted.

nb we found that a channel closing will not also bring down the connection.

We've been running this in production for a couple of months and it's worked pretty well.