/coney

Consumer server for RabbitMQ with message publishing functionality.

Primary LanguageElixirMIT LicenseMIT

Coney

Hex Version Build Status

Consumer server for RabbitMQ with message publishing functionality.

Table of Contents

Installation

Add Coney as a dependency in your mix.exs file.

def deps do
  [{:coney, "~> 2.2"}]
end

After you are done, run mix deps.get in your shell to fetch and compile Coney.

Setup a consumer server

# config/config.exs
config :coney,
  adapter: Coney.RabbitConnection,
  pool_size: 1,
  settings: %{
    url: "amqp://guest:guest@localhost", # or ["amqp://guest:guest@localhost", "amqp://guest:guest@other_host"]
    timeout: 1000
  },
  workers: [
    MyApplication.MyConsumer
  ]
# also you can define mapping like this and skip it in consumer module:
  workers: [
    %{
      connection: %{
        prefetch_count: 10,
        exchange:       {:direct, "my_exchange", durable: true},
        queue:          {"my_queue", durable: true},
        binding:        [routing_key: "routing_key"]
      },
      worker: MyApplication.MyConsumer
    }
  ]
# config/test.exs

config :coney, adapter: Coney.FakeConnection, settings: %{}
# web/consumers/my_consumer.ex

defmodule MyApplication.MyConsumer do
  @behaviour Coney.Consumer

  def connection do
    %{
      prefetch_count: 10,
      exchange:       {:direct, "my_exchange", durable: true},
      queue:          {"my_queue", durable: true},
      binding:        [routing_key: "routnig_key"]
    }
  end

  def parse(payload, _meta) do
    String.to_integer(payload)
  end

  def process(number, _meta) do
    if number <= 10 do
      :ok
    else
      :reject
    end
  end

  # Be careful here, if call of `error_happened` will raise an exception, 
  # message will be not handled properly and may be left unacked in a queue
  def error_happened(exception, payload, _meta) do
    IO.puts "Exception raised with #{ payload }"
    :redeliver
  end
end

Rescuing exceptions

If exception was happened during calls of parse or process functions, by default Coney will reject this message. If you want to add additional functionality in order to handle exception in a special manner, you can implement one of error_happened/3 or error_happened/4 callbacks. But be careful, if call of error_happened will raise an exception, message will be not handled properly and may be left unacked in a queue.

error_happened/3

This callback receives exception, original payload and meta as parameters. Response format is the same as in process callback.

error_happened/4

This callback receives exception, stacktrace, original payload and meta as parameters. Response format is the same as in process callback.

.process/2 and .error_happened return format

  1. :ok - ack message.
  2. :reject - reject message.
  3. :redeliver - return message to the queue.
  4. {:reply, binary} - response will be published to reply exchange.

Reply description

To use {:reply, binary} you should add response exchange in connection:

# web/consumers/my_consumer.ex

def connection do
  %{
    # ...
    respond_to: "response_exchange"
  }
end

Response will be published to "response_exchange" exchange.

The default exchange

To use the default exchange you should set connection.exchange to :default parameter:

# web/consumers/my_consumer.ex

def connection do
  %{
    # ...
    exchange: :default
  }
end

The following format is also acceptable:

def connection do
  %{
    # ...
    exchange: {:direct, ""}
  }
end

Publish message

Coney.publish("exchange", "message")

# or

Coney.publish("exchange", "routing_key", "message")

Checking connections

You can useConey.status/0 if you need to get information about RabbitMQ connections:

iex> Coney.status()
[{#PID<0.972.0>, :connected}]

Result is a list of tuples, where first element in tuple is a pid of running connection server and second element describes connection status.

Connection status can be:

  • :pending - when coney just started
  • :connected - when RabbitMQ connection has been established and all consumers have been started
  • :disconnected - when coney lost connection to RabbitMQ

Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/llxff/coney.

License

The library is available as open source under the terms of the MIT License.