pma/amqp

issue with 0.2.0 - publisher killed randomly

Closed this issue ยท 6 comments

Hi everybody!

Thanks for the great work you put in this library...it's really appreciated!

I've got a problem in my project by upgrading from 0.2.0-pre.2 to 0.2.0

This is my publisher module:

defmodule Atreyu.Rabbit.Publisher do
  @moduledoc """
  rabbitmq publisher
  """
  use AMQP
  use GenServer

  @connection_params Application.get_env(:atreyu, :rabbit)[:connection_params]

  def start_link do
    Logger.debug "Starting RabbitMQ publisher"
    GenServer.start_link(__MODULE__, get_channel(), name: :publisher)
  end

  defp get_channel do
    with {:ok, conn} <- Connection.open(@connection_params),
         {:ok, chan} <- Channel.open(conn),
      do: chan
  end

  def publish(exchange, routing_key, payload) do
    GenServer.call(:publisher, {:publish, exchange, routing_key, payload})
  end
  
  def handle_call({:publish, exchange, routing_key, payload}, _from, chan) do
    :ok = Basic.publish chan, exchange, routing_key, payload
    {:reply, :ok, chan}
  end
end

very easy and straightforward

Now, in my test suite, if I use 0.2.0-pre.2 everything works. With 0.2.0 I got random errors about the publisher being down and terminated. Some messages get delivered, but at a some point in my test suite some messages starts to kill the GenServer.

this is the error I get:

14:37:00.095 [error] GenServer :publisher terminating
** (stop) exited in: :gen_server.call(#PID<0.928.0>, {:call, {:"basic.publish", 0, "ania", "ania.atr.registra", false, false}, {:amqp_msg, {:P_basic, :undefined, :undefined, :undefined, 1, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined, :undefined}, "{\"code\":\"CASO4\"}"}, #PID<0.930.0>}, :infinity)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (stdlib) gen_server.erl:212: :gen_server.call/3
    (atreyu) lib/atreyu/rabbit/publisher.ex:49: Atreyu.Rabbit.Publisher.handle_call/3
    (stdlib) gen_server.erl:615: :gen_server.try_handle_call/4
    (stdlib) gen_server.erl:647: :gen_server.handle_msg/5
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:publish, "ania", "ania.atr.registra", "{\"code\":\"CASO4\"}"}

a general error that simply tells that the GenServer is down, so the message cannot be delivered.

Does anybody want to share some ideas? Is this possibly related to this? 0f0c825 (the only relevant change between the two versions, beside amqp_client and rabbit_common)

Many thanks in advance!

pma commented

@matteosister With the change in 0f0c825, if the channel process is dead, :amqp_channel.call will fail, whereas with :amqp_channel.cast it would just return :ok even if no channel process is there to transmit the message to the broker.

My first thought is that the change would just make the problem visible, not actually cause it. Could it be that you were previously just losing messages under high load?

ono commented

Another guess is that there might be code closing a channel or connection somewhere in your tests. Try this code and see if your tests pass?

def handle_call({:publish, exchange, routing_key, payload}, _from, chan) do
    temp_chan = get_channel()
    :ok = Basic.publish temp_chan, exchange, routing_key, payload
    Channel.close temp_chan
    Connection.close temp_chan.connection
    {:reply, :ok, chan}
 end

Obviously it is not efficient to run this on production so just to confirm the suspicion.
If you close a channel in middle of tests, I suggest you to open it again.

thanks @pma and @ono

I confirm that by manually opening a new channel and closing it after publishing the message the test suite work! So there must be something that closes the channel...currently investigating...

it turns out it was just a problem with amqp. I was not declaring the exchange I was publishing to. So, as @pma said, the change from cast to call in the publishing call surfaced the problem. It was happening only in test, and not in prod (due to the fact that exchange in prod is already declared). So, I'm happy to finally be on 0.2.0 ๐Ÿ‘

many thanks!

ono commented

@matteosister Great. Glad to hear it's sorted. Yeah, we expect this kind of implications but in a good way. You want to be alerted when the message was not delivered to the queue/exchange.

Thanks for the follow-up ๐Ÿ‘

@ono totally agree with the design choice! ๐Ÿ‘