An AMQP adapter for Conduit.
This package can be installed as:
- Add
conduit_amqp
to your list of dependencies inmix.exs
:
```elixir
def deps do
[{:conduit_amqp, "~> 0.6.1"}]
end
```
- Ensure
conduit_amqp
is started before your application:
```elixir
def application do
[applications: [:conduit_amqp]]
end
```
# config/config.exs
config :my_app, MyApp.Broker,
adapter: ConduitAMQP,
url: "amqp://my_app:secret@my-rabbit-host.com"
# Stop lager redirecting :error_logger messages
config :lager, :error_logger_redirect, false
# Stop lager removing Logger's :error_logger handler
config :lager, :error_logger_whitelist, [Logger.ErrorHandler]
For the full set of options, see ConduitAQMP.
You can define exchanges with the exchange
macro in the
configure
block of your Broker. The exchange
macro accepts
the name of the exchange and options for the exchange.
:type
- Either:topic
,:fanout
,:direct
, or:headers
. Defaults to:topic
.:durable
- If set, keeps the Exchange between restarts of the broker. Defaults tofalse
.:auto_delete
- If set, deletes the Exchange once all queues unbind from it. Defaults tofalse
.:passive
- If set, returns an error if the Exchange does not already exist. Defaults tofalse
.:internal
- If set, the exchange may not be used directly by publishers. Defaults tofalse
.
See exchange.declare for more details.
defmodule MyApp.Broker do
use Conduit.Broker, otp_app: :my_app
configure do
exchange "my.topic", type: :topic, durable: true
end
end
You can define queues with the queue
macro in the
configure
block of your Broker. The queue
macro accepts
the name of the queue and options for the exchange.
:durable
- If set, keeps the Queue between restarts of the broker. Defaults tofalse
.:auto_delete
- If set, deletes the Queue once all subscribers disconnect. Defaults tofalse
.:exclusive
- If set, only one subscriber can consume from the Queue. Defaults tofalse
.:passive
- If set, raises an error unless the queue already exists. Defaults tofalse
.:from
- A list of routing keys to bind the queue to.:exchange
- Name of the exchange used to bind the queue to the routing keys.
See queue.declare for more details.
defmodule MyApp.Broker do
use Conduit.Broker, otp_app: :my_app
configure do
queue "my.queue", from: ["#.created.user"], exchange: "amq.topic", durable: true
end
end
Inside an incoming
block for a broker, you can define subscriptions to queues. Conduit will route messages on those
queues to your subscribers.
defmodule MyApp.Broker do
incoming MyApp do
subscribe :my_subscriber, MySubscriber, from: "my.queue"
subscribe :my_other_subscriber, MyOtherSubscriber,
from: "my.other.queue",
prefetch_size: 20
end
end
:from
- Accepts a string or function that resolves to the queue to consume from. Defaults to the name of the route if not specified.:prefetch_size
- Size of prefetch buffer in octets. Defaults to0
, which means no specific limit. This can also be configured globally by passing this same option when configuring your Broker.:prefetch_count
- Number of messages to prefetch. Defaults to0
, which means no specific limit. This can also be configured globally by passing this same option when configuring your Broker.
Note: It's highly recommended to set :prefetch_size
or :prefetch_count
to a non-zero value to limit the memory consumed when a queue is backed up.
See basic.qos for more details on options.
Inside an outgoing
block for a broker, you can define publications to exchanges. Conduit will deliver messages using the
options specified. You can override these options, by passing different options to your broker's publish/3
.
defmodule MyApp.Broker do
outgoing do
publish :something,
to: "my.routing_key",
exchange: "amq.topic"
publish :something_else,
to: "my.other.routing_key",
exchange: "amq.topic"
end
end
:to
- The routing key for the message. If the message already has it's destination set, this option will be ignored.:exchange
- The exchange to publish to. This option is required.
See basic.publish for more details.
When ConduitAMQP is used as an adapter for Conduit, it starts ConduitAMQP as a child supervisor. ConduitAMQP starts:
- ConduitAQMP.ConnPool - Creates and supervises a pool of AMQP connections.
- ConduitAMQP.PubSub - Creates and supervises ConduitAMQP.PubPool and ConduitAMQP.SubPool.
- ConduitAMQP.Subscribers - A supervisor for subscribers that process messages.