Detecting Connection and Channel Down
mvbaffa opened this issue · 8 comments
Hi,
I would like to know how can I trace the errors when the connection or channel are down.
I have a supervisor of my module and I can open and read messages from a queue.
If I go to Rabbit Management application and kill my connection I receive the message:
[warn] Connection (#PID<0.158.0>) closing: received hard error {:"connection.close", 320, "CONNECTION_FORCED - Closed via management plugin",
0, 0} from server
But the Supervisor does not instantiates a new process. My handle_info is mattching basic_cancel, basic_cancel_ok and a Catch all. By the way the same Supervisor works when my GenEvent Handler crashes.
Why none of the handle info where selected ?
@mvbaffa Hi,
AMQP.Connection.open will not link your caller process to the connection pid. One way to detect disconnects and implement reconnects is described in https://github.com/pma/amqp#stable-rabbitmq-connection. If you force close the connection on the server side, the connection pid will die and if you monitor it you can react and start a new connection.
Handling reconnects directly by the Supervisor restart mechanism is usually not the best approach. The restarts will happen so fast that the max retries of your supervisor will be exceeded. This will cause the supervisor to crash and cascade the failure, bringing the entire app down (see http://ferd.ca/it-s-about-the-guarantees.html).
Hi,
I am using exactly the same code you have suggested. the only difference is that a do not have an exchange after the Basic.qos a I declare my queue. Look:
defp rabbitmq_connect(em_pid) do
Logger.info "Opening Connection ..."
case Connection.open(@conn) do
{:ok, conn} ->
# Get notifications when the connection goes down
Process.monitor(conn.pid)
{:ok, chan} = Channel.open(conn)
Basic.qos(chan, prefetch_count: @prefetch)
Queue.declare(chan, @queue, durable: true, arguments: [])
{:ok, _consumer_tag} = Basic.consume(chan, @queue, nil, no_ack: true)
{:ok, chan, conn}
{:error, _} ->
Logger.info "Reopening Connection ..."
# Reconnection loop
:timer.sleep(@retryConn)
spawn fn -> startProcessing(em_pid) end
end
wait_for_messages(0, em_pid)
end
And
def handle_info({:DOWN, _, :process, _pid, _reason}, state) do
IO.puts " [**] Server Down ..."
{:ok, chan} = startProcessing(state)
{:noreply, chan}
end
The startProcessing will call rabbitmq_connect. But the message down does not arrives. I have a catch all handle_info too and nothing ...
Thanks
@mvbaffa You are probably missing a call to Process.flag(: trap_exit, true)
to receive de exit signals as messages. You can include it in the init of your genserver.
No. It did not work. Same thing
@mvbaffa If you can share a git repo with a minimum example that replicates the issue, I can take a look at it and try to figure out why it's not working.
Our repo is private. I will send you the files if you do not care
The wait_for_messages is piece of code taken from the RabbitMQ site and rabbitmq_connect from your site.
I can receive messages normally but there are two problems:
-
I cannot restart the MM.Queue.TransactionAvaiable server, when I force the close of the RabbitMq connection, even with a supervisor (MM.Queues.Supervisor). As I told you the GenEvent MM.Rabbitmq.TransactionAvaiableHandler is normally restarted by the supervisor when it crashes. I can simulate crashes sending an invalid json message
-
I would like to notified, subscribe the queue, when messages are avaiable. I could not make the "Setup a Consumer" example, from your site, work so I took from RabbitMQ the wait_for_messages in the MM.Queue.TransactionAvaiable server.
I have included the trap_exit in the init of MM.Queue.TransactionAvaiable but the problem continues
Sorry, maybe the solution is trivial but I am new to elixir. This is probably a newbye issue.
Thank you very much
defmodule MM.Queues.Supervisor do
@moduledoc """
Queues Supervisor
"""
require Logger
use Supervisor
@server __MODULE__
@doc """
Starts the Queue Supervisor.
"""
def start_link do
Supervisor.start_link(@server, {:ok}, [name: @server])
end
@doc """
inits the Supervisor by starting the handlers and the queue servers
"""
def init({:ok}) do
{:ok, transaction_avaiable_manager} = GenEvent.start_link
children = [
worker(MM.Watcher.CommandHandler, [transaction_avaiable_manager, MM.Rabbitmq.TransactionAvaiableHandler]),
worker(MM.Queue.TransactionAvaiable, [transaction_avaiable_manager])
]
supervise(children, strategy: :one_for_one)
end
end
defmodule MM.Watcher.CommandHandler do
@moduledoc false
use GenServer
@server __MODULE__
@doc """
Starts the CommandHandler server, receiving manager pid and handler atom
"""
def start_link(event_manager_pid, event_handler_atom) do
GenServer.start_link(@server, {event_manager_pid, event_handler_atom}, [name: @server])
end
@doc """
inits CommandHandler server
"""
def init({event_manager_pid, event_handler_atom}) do
Process.monitor(event_manager_pid)
start_handler(event_manager_pid, event_handler_atom)
end
@doc """
Stops this watcher in a case of Event Manager goes down.
"""
def handle_info({:DOWN, _ref, :process, _pid, _reason}, _from) do
{:stop, "CommandHandlerWatcher down.", []}
end
@doc """
handles DOWN messages from the GenEvent when it terminates normally handler and close the queue connection
"""
def handle_info({:gen_event_EXIT, _handler, reason}, state) when reason in [:normal, :shutdown] do
{:stop, reason, state}
end
@doc """
Handles EXIT messages from the GenEvent handler and restarts it.
"""
def handle_info({:gen_event_EXIT, handler, _reason}, event_manager_pid) do
{:ok, _} = start_handler(event_manager_pid, handler)
{:noreply, event_manager_pid}
end
defp start_handler(event_manager_pid, event_handler_atom) do
case GenEvent.add_mon_handler(event_manager_pid, event_handler_atom, {event_manager_pid, event_handler_atom}) do
:ok -> {:ok, event_manager_pid}
{:error, reason} -> {:stop, reason}
end
end
end
defmodule MM.Rabbitmq.TransactionAvaiableHandler do
@moduledoc false
require Logger
use GenEvent
# Callbacks
@doc """
inits Handler
"""
def init(state) do
Logger.info "Starting TransactionAvaiable Handler ..."
{:ok, state}
end
@doc """
Handle Transaction Avaiable Event
"""
def handle_event({:transaction_avaiable, {num_msg, payload}}, state) do
decoded = :jsx.decode(payload)
IO.puts " [->] Dec(#{num_msg+1}): #{inspect decoded}"
{:ok, state}
end
@doc """
Handle any other message
"""
def handle_event(_msg, state) do
{:ok, state}
end
def handle_call(_msg, state) do
{:ok, :ok, state}
end
def handle_info(_msg, state) do
{:ok, state}
end
end
defmodule MM.Queue.TransactionAvaiable do
@moduledoc false
require Logger
use GenServer
use AMQP
@server __MODULE__
@conn "amqp://user:password@10.999.99.999"
@queue "queue"
@doc """
Starts the Queue server.
"""
def start_link(event_manager_pid) do
Logger.info "Starting TransactionAvaiable Queue Server ..."
GenServer.start_link(@server, {:ok, 0, event_manager_pid}, [name: @server])
end
def stop(server) do
Logger.info "Stopping TransactionAvaiable Queue Server ..."
GenServer.call(server, :stop)
end
@doc """
inits the GenServer by opening the queue connection
"""
def init({:ok, num_msg, event_manager_pid}) do
Logger.info "Initializes TransactionAvaiable Queue Server ..."
rabbitmq_connect(num_msg, event_manager_pid)
end
defp rabbitmq_connect(num_msg, event_manager_pid) do
case Connection.open(@conn) do
{:ok, conn} ->
# Get notifications when the connection goes down
Process.monitor(conn.pid)
{:ok, chan} = Channel.open(conn)
Basic.qos(chan, prefetch_count: 10)
Queue.declare(chan, @queue, durable: true, arguments: [])
{:ok, _consumer_tag} = Basic.consume(chan, @queue, nil, no_ack: true)
{:ok, chan, conn}
{:error, _} ->
# Reconnection loop
:timer.sleep(10000)
rabbitmq_connect(num_msg, event_manager_pid)
end
# IO.puts " [*] Waiting for messages. To exit press CTRL+C, CTRL+C"
wait_for_messages(num_msg, event_manager_pid)
end
@doc """
Waits for messages to be consumed
"""
def wait_for_messages(num_msg, event_manager_pid) do
receive do
{:basic_deliver, payload, _meta} ->
GenEvent.notify(event_manager_pid, {:transaction_avaiable, {num_msg, payload}})
wait_for_messages(num_msg+1, event_manager_pid)
end
end
@doc """
handles EXIT messages from the GenEvent handler and restarts it
"""
def handle_info({:DOWN, _ref, :process, _pid, _reason}, {_num_msg, conn}) do
AMQP.Connection.close(conn)
{:stop, "Transaction Avaiable Queue down.", []}
end
@doc """
handles other messages
"""
def handle_info(msg, state) do
Logger.warn("Unknown msg received #{inspect(msg)}")
{:noreply, state}
end
@doc """
handles server down
"""
def terminate(_reason, {_, conn}) do
AMQP.Connection.close(conn)
:ok
end
end
@mvbaffa Your MM.Queue.TransactionAvaiable
module is misusing the GenServer behaviour. The init
callback in a GenServer is supposed to return {:ok, inital_state}
and by calling rabbitmq_connect
and entering a receive loop in wait_for_messages
, the GenServer doesn't even start properly.
When using the GenServer behaviour, the receive loop is implemented in the "generic" module. Your "specific" module should only implement the callbacks functions. You shouldn't do receive loop of your own if you are implementing a GenServer behaviour.
To make it work as intended, make sure that after successfully connecting, your init actually returns {:ok, state}, where state is whatever state you need to maintain and pass around handler functions (the conn, chan, etc..).
Handling the :basic_deliver
message should be done by defining a handle_info
callback function:
def handle_info({:basic_deliver, payload, meta}, your_state) do
# Process message
{:noreply, your_state}
end
And if after successfully connecting you want to receive a signal when the connection dies, you must enable the trap_exit flag in your init:
def init(_args) do
Process.flag(: trap_exit, true)
# Initialize
{:ok, initial_state}
end
This post (http://beatscodeandlife.ghost.io/understanding-genserver-with-elixir/) tries to explain how the GenServer behaviour works under the hood. I think it could help clarify some of my notes above.
I will change the code and as soon as it works I will let you know.
Thank you