
Consumer server for RabbitMQ with message publishing functionality.

Primary LanguageElixirMIT LicenseMIT


Hex Version Build Status

Consumer server for RabbitMQ with message publishing functionality.

Table of Contents


Add Coney as a dependency in your mix.exs file.

def deps do
  [{:coney, "~> 2.2"}]

After you are done, run mix deps.get in your shell to fetch and compile Coney.

Setup a consumer server

# config/config.exs
config :coney,
  adapter: Coney.RabbitConnection,
  pool_size: 1,
  settings: %{
    url: "amqp://guest:guest@localhost", # or ["amqp://guest:guest@localhost", "amqp://guest:guest@other_host"]
    timeout: 1000
  workers: [
# also you can define mapping like this and skip it in consumer module:
  workers: [
      connection: %{
        prefetch_count: 10,
        exchange:       {:direct, "my_exchange", durable: true},
        queue:          {"my_queue", durable: true},
        binding:        [routing_key: "routing_key"]
      worker: MyApplication.MyConsumer
# config/test.exs

config :coney, adapter: Coney.FakeConnection, settings: %{}
# web/consumers/my_consumer.ex

defmodule MyApplication.MyConsumer do
  @behaviour Coney.Consumer

  def connection do
      prefetch_count: 10,
      exchange:       {:direct, "my_exchange", durable: true},
      queue:          {"my_queue", durable: true},
      binding:        [routing_key: "routnig_key"]

  def parse(payload, _meta) do

  def process(number, _meta) do
    if number <= 10 do

  # Be careful here, if call of `error_happened` will raise an exception, 
  # message will be not handled properly and may be left unacked in a queue
  def error_happened(exception, payload, _meta) do
    IO.puts "Exception raised with #{ payload }"

Rescuing exceptions

If exception was happened during calls of parse or process functions, by default Coney will reject this message. If you want to add additional functionality in order to handle exception in a special manner, you can implement one of error_happened/3 or error_happened/4 callbacks. But be careful, if call of error_happened will raise an exception, message will be not handled properly and may be left unacked in a queue.


This callback receives exception, original payload and meta as parameters. Response format is the same as in process callback.


This callback receives exception, stacktrace, original payload and meta as parameters. Response format is the same as in process callback.

.process/2 and .error_happened return format

  1. :ok - ack message.
  2. :reject - reject message.
  3. :redeliver - return message to the queue.
  4. {:reply, binary} - response will be published to reply exchange.

Reply description

To use {:reply, binary} you should add response exchange in connection:

# web/consumers/my_consumer.ex

def connection do
    # ...
    respond_to: "response_exchange"

Response will be published to "response_exchange" exchange.

The default exchange

To use the default exchange you should set connection.exchange to :default parameter:

# web/consumers/my_consumer.ex

def connection do
    # ...
    exchange: :default

The following format is also acceptable:

def connection do
    # ...
    exchange: {:direct, ""}

Publish message

Coney.publish("exchange", "message")

# or

Coney.publish("exchange", "routing_key", "message")

Checking connections

You can useConey.status/0 if you need to get information about RabbitMQ connections:

iex> Coney.status()
[{#PID<0.972.0>, :connected}]

Result is a list of tuples, where first element in tuple is a pid of running connection server and second element describes connection status.

Connection status can be:

  • :pending - when coney just started
  • :connected - when RabbitMQ connection has been established and all consumers have been started
  • :disconnected - when coney lost connection to RabbitMQ


Bug reports and pull requests are welcome on GitHub at https://github.com/llxff/coney.


The library is available as open source under the terms of the MIT License.