dashbitco/broadway_rabbitmq

Add API to setup RabbitMQ topology on producer connect

whatyouhide opened this issue · 4 comments

We (at work) often have the necessity of defining complex RabbitMQ topologies of exchanges and queues. RabbitMQ is pretty clear on the pattern here: declare everything you need every time you connect since declaring is idempotent. We also have a hard requirement of trying to connect to multiple RabbitMQ URLs until one connection succeeds.

The code that we currently have to do things like declare an exchange and bind it to another exchange before declaring the consumer queue and starting the producer looks like this:

with {:ok, conn} <- AMQP.Connection.open(...),
     {:ok, chan} <- AMQP.Channel.open(conn),
     :ok <- declare_exchange(chan),
     :ok <- bind_exchange(chan) do
  Broadway.start_link(...)
end

This has a huge disadvantage: it makes our Broadway pipeline synchronous when starting. That is, the pipeline won't start unless RabbitMQ is available right away. This goes against the good principle of starting the process and connecting in the background with potential backoff (see this great article).

What we would like to do instead is to be able to declare the necessary RabbitMQ topology every time the producer connects to RabbitMQ. This will free us of the synchronization point and generally makes things more coherent.

To achieve this, we can go two ways: either we provide a way to pass in a generic piece of code that takes a AMQP channel, or we come up with a schema for additional options that lets us declare arbitrary topologies.

Note that we're only interested in topologies that are strictly related to the pipeline here. This means that we don't care about being able to declare additional queues for example, since the pipeline can only consume from a single queue.

Option 1: custom code

The first option is to pass a generic piece of code that takes the AMQP channel. The API I propose is a new option :rabbitmq_setup_fun that takes an anonymous function or an MFA tuple.

rabbitmq_setup_fun: (AMQP.Channel.t() -> :ok | {:error, reason}) | {module(), atom(), [term()]}

An example of its usage:

rabbitmq_setup_fun: fn chan ->
  with :ok <- AMQP.Exchange.declare(chan, "my-exchange", :direct),
       :ok <- AMQP.Exchange.bind(chan, "other-exchange", "my-exchange", routing_key: "#"),
       do: :ok
end

or with MFA:

rabbitmq_setup_fun: {__MODULE__, :declare_exchange, [_routing_key = "#"]}

# In the module:
def declare_exchange(chan, routing_key) do
  with :ok <- AMQP.Exchange.declare(chan, "my-exchange", :direct),
       :ok <- AMQP.Exchange.bind(chan, "other-exchange", "my-exchange", routing_key: routing_key),
       do: :ok
end

Option 2: new options to declare exchanges and do exchange-to-exchange bindings

The alternative to custom code is to provide more options that let us declare arbitrary topologies. The nice thing is that in RabbitMQ you can only have bindings that look like this:

exchange1 -> exchange2 -> ... -> exchangeN -> queue

That is, you can only bind to one queue at the end of the "exchanges pipeline". This means we only need to support two things:

  1. Declaring an arbitrary number of exchanges
  2. Declaring an arbitrary number of exchange-to-exchange bindings

We don't need to support exchange-to-queue bindings since we already support that through the :bindings option.

Declaring exchanges

What I propose is to have a new :declare_exchanges option to declare exchanges:

declare_exchanges: [{name :: String.t(), type :: :direct | :topic | :headers | :fanout, options :: keyword()}]

For example:

declare_exchanges: [
  {"my-exchange", :topic, durable: true},
  {"my-other-exchange", :headers, durable: true, internal: true}
]

Binding exchanges to other exchanges

I have two different proposals here.

Option 1: add a new option :exchange_bindings (to mirror the name of the already existing :bindings) option.

exchange_bindings: [{source :: String.t(), dest :: String.t(), options :: keyword()}]

Option 2: modify the current :bindings option to support exchange-to-exchange bindings as well. Right now this option supports a list of {exchange_name :: String.t(), options :: keyword()}. What I propose is to switch to three-element tuples like the one above: {source :: String.t(), dest :: String.t(), options :: keyword()}. However, we would have a special possible value for dest which is the atom :queue which represents the queue used in the :queue option.

All thoughts are welcome, excited to have this discussion!

cc @josevalim @msaraiva @wojtekmach

I would honestly prefer Option #1. Maybe call it after_connect to mirror DBConnection?

If we go with #1, let's absolutely call it after_connect, awesome suggestion.

Personally I'm more inclined to go with #2 though even if I didn't express preference (not avoid pushing the needle 😄). #2 is more declarative plus you don't have to deal with failure manually since we can deal with that in the options. I'd go with :declare_exchanges and with "Option 2" in the exchange-to-exchange bindings though.

FWIW, I see the point of being generic with after_connect, but I don't think you'd do anything that is not declaring exchanges or binding exchanges together. Can you think of other use cases for after_connect that maybe I'm missing?

I am not sure if #1 would be used for other cases but at the same time I am not sure if #2 would cover all cases. What if the exchange is coming from an environemnt variable? Now we have to make it a MFA and etc. What if there are other topologies? I think at some point we need to say "it is in your hands now" and it is an arbitrary line but it seems starting with the base case is a safer bet here. :)

@josevalim you have a point and the thing that is bringing me over the line anyways is that we can always add support for the declarative way in the future if we wanted to. I'll make a PR for this.