Consumer server for RabbitMQ with message publishing functionality.
Add Coney as a dependency in your mix.exs
file.
def deps do
[{:coney, "~> 2.2"}]
end
After you are done, run mix deps.get
in your shell to fetch and compile Coney.
# config/config.exs
config :coney,
adapter: Coney.RabbitConnection,
pool_size: 1,
settings: %{
url: "amqp://guest:guest@localhost", # or ["amqp://guest:guest@localhost", "amqp://guest:guest@other_host"]
timeout: 1000
},
workers: [
MyApplication.MyConsumer
]
# also you can define mapping like this and skip it in consumer module:
workers: [
%{
connection: %{
prefetch_count: 10,
exchange: {:direct, "my_exchange", durable: true},
queue: {"my_queue", durable: true},
binding: [routing_key: "routing_key"]
},
worker: MyApplication.MyConsumer
}
]
# config/test.exs
config :coney, adapter: Coney.FakeConnection, settings: %{}
# web/consumers/my_consumer.ex
defmodule MyApplication.MyConsumer do
@behaviour Coney.Consumer
def connection do
%{
prefetch_count: 10,
exchange: {:direct, "my_exchange", durable: true},
queue: {"my_queue", durable: true},
binding: [routing_key: "routnig_key"]
}
end
def parse(payload, _meta) do
String.to_integer(payload)
end
def process(number, _meta) do
if number <= 10 do
:ok
else
:reject
end
end
# Be careful here, if call of `error_happened` will raise an exception,
# message will be not handled properly and may be left unacked in a queue
def error_happened(exception, payload, _meta) do
IO.puts "Exception raised with #{ payload }"
:redeliver
end
end
If exception was happened during calls of parse
or process
functions, by default Coney will reject this message. If you want to add additional functionality in order to handle exception in a special manner, you can implement one of error_happened/3
or error_happened/4
callbacks. But be careful, if call of error_happened
will raise an exception, message will be not handled properly and may be left unacked in a queue.
This callback receives exception
, original payload
and meta
as parameters. Response format is the same as in process callback.
This callback receives exception
, stacktrace
, original payload
and meta
as parameters. Response format is the same as in process callback.
:ok
- ack message.:reject
- reject message.:redeliver
- return message to the queue.{:reply, binary}
- response will be published to reply exchange.
Reply description
To use {:reply, binary}
you should add response exchange in connection
:
# web/consumers/my_consumer.ex
def connection do
%{
# ...
respond_to: "response_exchange"
}
end
Response will be published to "response_exchange"
exchange.
To use the default exchange you should set connection.exchange
to :default
parameter:
# web/consumers/my_consumer.ex
def connection do
%{
# ...
exchange: :default
}
end
The following format is also acceptable:
def connection do
%{
# ...
exchange: {:direct, ""}
}
end
Coney.publish("exchange", "message")
# or
Coney.publish("exchange", "routing_key", "message")
You can useConey.status/0
if you need to get information about RabbitMQ connections:
iex> Coney.status()
[{#PID<0.972.0>, :connected}]
Result is a list of tuples, where first element in tuple is a pid of running connection server and second element describes connection status.
Connection status can be:
:pending
- when coney just started:connected
- when RabbitMQ connection has been established and all consumers have been started:disconnected
- when coney lost connection to RabbitMQ
Bug reports and pull requests are welcome on GitHub at https://github.com/llxff/coney.
The library is available as open source under the terms of the MIT License.