pma/amqp

Detecting Connection and Channel Down

mvbaffa opened this issue · 8 comments

Hi,

I would like to know how can I trace the errors when the connection or channel are down.

I have a supervisor of my module and I can open and read messages from a queue.
If I go to Rabbit Management application and kill my connection I receive the message:
[warn] Connection (#PID<0.158.0>) closing: received hard error {:"connection.close", 320, "CONNECTION_FORCED - Closed via management plugin",
0, 0} from server

But the Supervisor does not instantiates a new process. My handle_info is mattching basic_cancel, basic_cancel_ok and a Catch all. By the way the same Supervisor works when my GenEvent Handler crashes.

Why none of the handle info where selected ?

pma commented

@mvbaffa Hi,

AMQP.Connection.open will not link your caller process to the connection pid. One way to detect disconnects and implement reconnects is described in https://github.com/pma/amqp#stable-rabbitmq-connection. If you force close the connection on the server side, the connection pid will die and if you monitor it you can react and start a new connection.

Handling reconnects directly by the Supervisor restart mechanism is usually not the best approach. The restarts will happen so fast that the max retries of your supervisor will be exceeded. This will cause the supervisor to crash and cascade the failure, bringing the entire app down (see http://ferd.ca/it-s-about-the-guarantees.html).

Hi,

I am using exactly the same code you have suggested. the only difference is that a do not have an exchange after the Basic.qos a I declare my queue. Look:

   defp rabbitmq_connect(em_pid) do

    Logger.info "Opening Connection ..."
    case Connection.open(@conn) do
        {:ok, conn} ->
          # Get notifications when the connection goes down
          Process.monitor(conn.pid)
          {:ok, chan} = Channel.open(conn)
          Basic.qos(chan, prefetch_count: @prefetch)
          Queue.declare(chan, @queue, durable: true, arguments: [])
          {:ok, _consumer_tag} = Basic.consume(chan, @queue, nil, no_ack: true)
          {:ok, chan, conn}
        {:error, _} ->
          Logger.info "Reopening Connection ..."
          # Reconnection loop
          :timer.sleep(@retryConn)
          spawn fn -> startProcessing(em_pid) end
    end
    wait_for_messages(0, em_pid)
  end

And

def handle_info({:DOWN, _, :process, _pid, _reason}, state) do
    IO.puts " [**] Server Down ..."
    {:ok, chan} = startProcessing(state)
    {:noreply, chan}
  end

The startProcessing will call rabbitmq_connect. But the message down does not arrives. I have a catch all handle_info too and nothing ...

Thanks

pma commented

@mvbaffa You are probably missing a call to Process.flag(: trap_exit, true) to receive de exit signals as messages. You can include it in the init of your genserver.

No. It did not work. Same thing

pma commented

@mvbaffa If you can share a git repo with a minimum example that replicates the issue, I can take a look at it and try to figure out why it's not working.

Our repo is private. I will send you the files if you do not care

The wait_for_messages is piece of code taken from the RabbitMQ site and rabbitmq_connect from your site.

I can receive messages normally but there are two problems:

  1. I cannot restart the MM.Queue.TransactionAvaiable server, when I force the close of the RabbitMq connection, even with a supervisor (MM.Queues.Supervisor). As I told you the GenEvent MM.Rabbitmq.TransactionAvaiableHandler is normally restarted by the supervisor when it crashes. I can simulate crashes sending an invalid json message

  2. I would like to notified, subscribe the queue, when messages are avaiable. I could not make the "Setup a Consumer" example, from your site, work so I took from RabbitMQ the wait_for_messages in the MM.Queue.TransactionAvaiable server.

I have included the trap_exit in the init of MM.Queue.TransactionAvaiable but the problem continues

Sorry, maybe the solution is trivial but I am new to elixir. This is probably a newbye issue.

Thank you very much

defmodule MM.Queues.Supervisor do
  @moduledoc """
  Queues Supervisor
  """

  require Logger
  use Supervisor

  @server __MODULE__

  @doc """
  Starts the Queue Supervisor.
  """
  def start_link do
    Supervisor.start_link(@server, {:ok}, [name: @server])
  end

  @doc """
    inits the Supervisor by starting the handlers and the queue servers
  """
  def init({:ok}) do

    {:ok, transaction_avaiable_manager} = GenEvent.start_link

    children = [
      worker(MM.Watcher.CommandHandler, [transaction_avaiable_manager, MM.Rabbitmq.TransactionAvaiableHandler]),
      worker(MM.Queue.TransactionAvaiable, [transaction_avaiable_manager])
    ]

    supervise(children, strategy: :one_for_one)
  end
end

defmodule MM.Watcher.CommandHandler do
  @moduledoc false
  use GenServer

  @server __MODULE__

  @doc """
    Starts the CommandHandler server, receiving manager pid and handler atom
  """
  def start_link(event_manager_pid, event_handler_atom) do
    GenServer.start_link(@server, {event_manager_pid, event_handler_atom}, [name: @server])
  end

  @doc """
    inits CommandHandler server
  """
  def init({event_manager_pid, event_handler_atom}) do
    Process.monitor(event_manager_pid)
    start_handler(event_manager_pid, event_handler_atom)
  end

  @doc """
    Stops this watcher in a case of Event Manager goes down.
  """
  def handle_info({:DOWN, _ref, :process, _pid, _reason}, _from) do
    {:stop, "CommandHandlerWatcher down.", []}
  end

  @doc """
    handles DOWN messages from the GenEvent when it terminates normally handler and close the queue connection
  """
  def handle_info({:gen_event_EXIT, _handler, reason}, state) when reason in [:normal, :shutdown] do
    {:stop, reason, state}
  end

  @doc """
    Handles EXIT messages from the GenEvent handler and restarts it.
  """
   def handle_info({:gen_event_EXIT, handler, _reason}, event_manager_pid) do
    {:ok, _} = start_handler(event_manager_pid, handler)
    {:noreply, event_manager_pid}
  end

  defp start_handler(event_manager_pid, event_handler_atom) do
    case GenEvent.add_mon_handler(event_manager_pid, event_handler_atom, {event_manager_pid, event_handler_atom}) do
     :ok -> {:ok, event_manager_pid}
     {:error, reason} -> {:stop, reason}
    end
  end
end

defmodule MM.Rabbitmq.TransactionAvaiableHandler do
  @moduledoc false

  require Logger
  use GenEvent

  # Callbacks

  @doc """
    inits Handler
  """
  def init(state) do
    Logger.info "Starting TransactionAvaiable Handler ..."
    {:ok, state}
  end

  @doc """
    Handle Transaction Avaiable Event
  """
  def handle_event({:transaction_avaiable, {num_msg, payload}}, state) do
    decoded = :jsx.decode(payload)
    IO.puts " [->] Dec(#{num_msg+1}): #{inspect decoded}"
    {:ok, state}
  end

  @doc """
    Handle any other message
  """
  def handle_event(_msg, state) do
    {:ok, state}
  end

  def handle_call(_msg, state) do
    {:ok, :ok, state}
  end

  def handle_info(_msg, state) do
    {:ok, state}
  end
end

defmodule MM.Queue.TransactionAvaiable do
  @moduledoc false

  require Logger

  use GenServer
  use AMQP

  @server __MODULE__

  @conn     "amqp://user:password@10.999.99.999"
  @queue    "queue"

  @doc """
  Starts the Queue server.
  """
  def start_link(event_manager_pid) do
    Logger.info "Starting TransactionAvaiable Queue Server ..."
    GenServer.start_link(@server, {:ok, 0, event_manager_pid}, [name: @server])
  end

  def stop(server) do
    Logger.info "Stopping TransactionAvaiable Queue Server ..."
    GenServer.call(server, :stop)
  end

  @doc """
    inits the GenServer by opening the queue connection
  """
  def init({:ok, num_msg, event_manager_pid}) do
    Logger.info "Initializes TransactionAvaiable Queue Server ..."
    rabbitmq_connect(num_msg, event_manager_pid)
  end

  defp rabbitmq_connect(num_msg, event_manager_pid) do

    case Connection.open(@conn) do
        {:ok, conn} ->
          # Get notifications when the connection goes down
          Process.monitor(conn.pid)
          {:ok, chan} = Channel.open(conn)
          Basic.qos(chan, prefetch_count: 10)
          Queue.declare(chan, @queue, durable: true, arguments: [])
          {:ok, _consumer_tag} = Basic.consume(chan, @queue, nil, no_ack: true)
          {:ok, chan, conn}
        {:error, _} ->
          # Reconnection loop
          :timer.sleep(10000)
          rabbitmq_connect(num_msg, event_manager_pid)
    end

#    IO.puts " [*] Waiting for messages. To exit press CTRL+C, CTRL+C"
    wait_for_messages(num_msg, event_manager_pid)
  end

  @doc """
    Waits for messages to be consumed
  """
  def wait_for_messages(num_msg, event_manager_pid) do
    receive do
      {:basic_deliver, payload, _meta} ->
        GenEvent.notify(event_manager_pid, {:transaction_avaiable, {num_msg, payload}})
        wait_for_messages(num_msg+1, event_manager_pid)
    end
  end

  @doc """
    handles EXIT messages from the GenEvent handler and restarts it
  """
  def handle_info({:DOWN, _ref, :process, _pid, _reason}, {_num_msg, conn}) do
    AMQP.Connection.close(conn)
    {:stop, "Transaction Avaiable Queue down.", []}
  end

  @doc """
    handles other messages
  """
  def handle_info(msg, state) do
    Logger.warn("Unknown msg received #{inspect(msg)}")
    {:noreply, state}
  end

  @doc """
    handles server down
  """
  def terminate(_reason, {_, conn}) do
    AMQP.Connection.close(conn)
    :ok
  end

end

pma commented

@mvbaffa Your MM.Queue.TransactionAvaiable module is misusing the GenServer behaviour. The init callback in a GenServer is supposed to return {:ok, inital_state} and by calling rabbitmq_connect and entering a receive loop in wait_for_messages, the GenServer doesn't even start properly.

When using the GenServer behaviour, the receive loop is implemented in the "generic" module. Your "specific" module should only implement the callbacks functions. You shouldn't do receive loop of your own if you are implementing a GenServer behaviour.

To make it work as intended, make sure that after successfully connecting, your init actually returns {:ok, state}, where state is whatever state you need to maintain and pass around handler functions (the conn, chan, etc..).

Handling the :basic_deliver message should be done by defining a handle_info callback function:

def handle_info({:basic_deliver, payload, meta}, your_state) do
  # Process message
  {:noreply, your_state}
end

And if after successfully connecting you want to receive a signal when the connection dies, you must enable the trap_exit flag in your init:

def init(_args) do
  Process.flag(: trap_exit, true)
  # Initialize
  {:ok, initial_state}
end

This post (http://beatscodeandlife.ghost.io/understanding-genserver-with-elixir/) tries to explain how the GenServer behaviour works under the hood. I think it could help clarify some of my notes above.

I will change the code and as soon as it works I will let you know.

Thank you