Consumer server for RabbitMQ with message publishing functionality.
Add Coney as a dependency in your mix.exs
def deps do
[{:coney, "~> 2.2"}]
After you are done, run mix deps.get
in your shell to fetch and compile Coney.
# config/config.exs
config :coney,
adapter: Coney.RabbitConnection,
pool_size: 1,
settings: %{
url: "amqp://guest:guest@localhost", # or ["amqp://guest:guest@localhost", "amqp://guest:guest@other_host"]
timeout: 1000
workers: [
# also you can define mapping like this and skip it in consumer module:
workers: [
connection: %{
prefetch_count: 10,
exchange: {:direct, "my_exchange", durable: true},
queue: {"my_queue", durable: true},
binding: [routing_key: "routing_key"]
worker: MyApplication.MyConsumer
# config/test.exs
config :coney, adapter: Coney.FakeConnection, settings: %{}
# web/consumers/my_consumer.ex
defmodule MyApplication.MyConsumer do
@behaviour Coney.Consumer
def connection do
prefetch_count: 10,
exchange: {:direct, "my_exchange", durable: true},
queue: {"my_queue", durable: true},
binding: [routing_key: "routnig_key"]
def parse(payload, _meta) do
def process(number, _meta) do
if number <= 10 do
# Be careful here, if call of `error_happened` will raise an exception,
# message will be not handled properly and may be left unacked in a queue
def error_happened(exception, payload, _meta) do
IO.puts "Exception raised with #{ payload }"
If exception was happened during calls of parse
or process
functions, by default Coney will reject this message. If you want to add additional functionality in order to handle exception in a special manner, you can implement one of error_happened/3
or error_happened/4
callbacks. But be careful, if call of error_happened
will raise an exception, message will be not handled properly and may be left unacked in a queue.
This callback receives exception
, original payload
and meta
as parameters. Response format is the same as in process callback.
This callback receives exception
, stacktrace
, original payload
and meta
as parameters. Response format is the same as in process callback.
- ack message.:reject
- reject message.:redeliver
- return message to the queue.{:reply, binary}
- response will be published to reply exchange.
Reply description
To use {:reply, binary}
you should add response exchange in connection
# web/consumers/my_consumer.ex
def connection do
# ...
respond_to: "response_exchange"
Response will be published to "response_exchange"
To use the default exchange you should set
to :default
# web/consumers/my_consumer.ex
def connection do
# ...
exchange: :default
The following format is also acceptable:
def connection do
# ...
exchange: {:direct, ""}
Coney.publish("exchange", "message")
# or
Coney.publish("exchange", "routing_key", "message")
You can useConey.status/0
if you need to get information about RabbitMQ connections:
iex> Coney.status()
[{#PID<0.972.0>, :connected}]
Result is a list of tuples, where first element in tuple is a pid of running connection server and second element describes connection status.
Connection status can be:
- when coney just started:connected
- when RabbitMQ connection has been established and all consumers have been started:disconnected
- when coney lost connection to RabbitMQ
Bug reports and pull requests are welcome on GitHub at
The library is available as open source under the terms of the MIT License.