pma/amqp

no process: the process is not alive or there's no process currently - TERMINATION!

Closed this issue · 4 comments

Hi,

I am trying to use the amqp library more or less based on the consumer example from Readme.md.

The problem is, that I need to use prefetch, which works, but I would like to use reject with requeue option together. Then there some weird stuff happens. I get an error like this and it looks like every is terminated.

Am I doing something in a way that I shouldn't? Or is this a bug?

So basically I start the consumer, then I get 2 messages. When I ack one of them then next one is poped up, BUT randomly process is killed out of unknow reason? Same goes with rej.

** (EXIT from #PID<0.378.0>) evaluator process exited with reason: exited in: :gen_server.call(#PID<0.398.0>, {:call, {:"basic.reject", 3, true}, :none, #PID<0.386.0>}, :infinity)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started

14:58:06.040 [error] GenServer Rabbitmq.Consumer terminating
** (stop) exited in: :gen_server.call(#PID<0.398.0>, {:call, {:"basic.reject", 3, true}, :none, #PID<0.386.0>}, :infinity)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (stdlib) gen_server.erl:214: :gen_server.call/3
    (rabbitmq) lib/rabbitmq/consumer.ex:62: Rabbitmq.Consumer.handle_cast/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:rej, 3}}
State: %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.389.0>}, pid: #PID<0.398.0>}

and my code is like

defmodule Rabbitmq.Consumer do
    @moduledoc """
    Module for listening on the RabbitMQ server.

    ## Params
        - connection_string
        - exchange
        - queue
        - queue_error
    """
    use GenServer
    use AMQP
    require Logger

    @exchange    "test1"
    @queue       "queue1"
    @queue_error "#{@queue}_error"

    ########
    # APIS #
    ########
    @doc """
    Starts to listen to given RabbitMQ queue

    ## Examples

        iex> {:ok, pid} = Rabbitmq.Listener.start_link
        iex> Process.alive? pid
        true

    ## TODO: 
        - Maybe the supervisor could send the configuration for queue, connection string, 
          exchange, queue_error and name of the module true the start link?
        - Also the payload could be send true the start_link(imei |> DeviceDataProcessing.start?)
        - Examples in the start_link/1 should probably not exist!
    """
    def start_link do
      GenServer.start_link(__MODULE__, [], name: __MODULE__)
    end
  
    def ack(tag) do
        GenServer.cast(__MODULE__, {:ack, tag})
    end

    def rej(tag) do
        GenServer.cast(__MODULE__, {:rej, tag})
    end

    #############
    # CALLBACKS #
    #############
    def init(_opts) do
        rabbitmq_connect()
    end

    def handle_cast({:ack, tag}, channel) do
        Basic.ack channel, tag
        {:noreply, channel}
    end

    def handle_cast({:rej, tag}, channel) do
        Basic.reject channel, tag, requeue: true
        {:noreply, channel}
    end
  
    # Confirmation sent by the broker after registering this process as a consumer
    def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, chan) do
      {:noreply, chan}
    end
  
    # Sent by the broker when the consumer is unexpectedly cancelled (such as after a queue deletion)
    def handle_info({:basic_cancel, %{consumer_tag: _consumer_tag}}, chan) do
      {:stop, :normal, chan}
    end
  
    # Confirmation sent by the broker to the consumer process after a Basic.cancel
    def handle_info({:basic_cancel_ok, %{consumer_tag: _consumer_tag}}, chan) do
      {:noreply, chan}
    end
  
    def handle_info({:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered}}, chan) do
      spawn fn -> consume(chan, tag, redelivered, payload) end
      {:noreply, chan}
    end

    # 2. Implement a callback to handle DOWN notifications from the system
    #    This callback should try to reconnect to the server
    def handle_info({:DOWN, _, :process, _pid, _reason}, _) do
        {:ok, chan} = rabbitmq_connect()
        {:noreply, chan}
    end
    
    #####################
    # PRIVATE FUNCTIONS #
    #####################
    defp consume(channel, tag, redelivered, payload) do
        

        # TODO: Add Logic worker assigned to process the data from RabbitMQ queue

        Logger.debug "#{inspect payload}"

        rescue
            # Requeue unless it's a redelivered message.
            # This means we will retry consuming a message once in case of exception
            # before we give up and have it moved to the error queue
            #
            # You might also want to catch :exit signal in production code.
            # Make sure you call ack, nack or reject otherwise comsumer will stop
            # receiving messages.
            exception ->
                Basic.reject channel, tag, requeue: not redelivered
                Logger.error "[Rabbitmq.Listener] Error with payload: #{inspect payload},\nwith error: #{inspect exception}"
    end

    defp rabbitmq_connect do
        Application.get_env(:rabbitmq, :connection_string)
            |> Connection.open 
            |> connection_status
    end

    defp connection_status({:ok, conn}) do
        Process.monitor(conn.pid)
        {:ok, chan} = Channel.open(conn)
        Basic.qos(chan, prefetch_count: 2)
        Queue.declare(chan, @queue_error, durable: true)
        Exchange.direct(chan, @exchange, durable: true)
        Queue.bind(chan, @queue, @exchange)
        {:ok, _consumer_tag} = Basic.consume(chan, @queue)
        Logger.debug "[Rabbitmq.Listener] Connected to RabbitMQ..."
        {:ok, chan}
    end

    defp connection_status({:error, reason}) do
        Logger.warn "[Rabbitmq.Listener] Something is wrong, can't connect #{inspect reason}, trying to reconnect..."
        :timer.sleep(10000)
        rabbitmq_connect()
    end
end

a little bit more of information.

Step 1:

screen shot 2018-02-16 at 15 37 40

Step 2:

screen shot 2018-02-16 at 15 37 46

Step 3:

screen shot 2018-02-16 at 15 38 06

Step 4:

screen shot 2018-02-16 at 15 38 18

Step 5:

screen shot 2018-02-16 at 15 38 28

On number 5 you can see, that the structre below recreated.

pma commented

@ulutomaz

Hi,

Could you be acknowledging or rejecting the same message twice, or trying to ack using an invalid delivery_tag?

At least I was able to reproduce the same end result by trying to ack twice (the second fails because the delivery_tag is no longer valid and silently crashes the channel process).

Steps:

iex(1)> {:ok, conn} = AMQP.Connection.open
{:ok, %AMQP.Connection{pid: #PID<0.217.0>}}
iex(2)> {:ok, chan} = AMQP.Channel.open(conn)
{:ok,
 %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.217.0>}, pid: #PID<0.229.0>}}
iex(3)> {:ok, consumer_tag} = AMQP.Basic.consume(chan, "queue1")
{:ok, "amq.ctag-JMkBETeHLnWn96slUkGSIQ"}
iex(4)> flush
{:basic_consume_ok, %{consumer_tag: "amq.ctag-JMkBETeHLnWn96slUkGSIQ"}}
:ok

...
# Manually published a message to the queue1
...

iex(5)> flush
{:basic_deliver, "hello",
 %{
   app_id: :undefined,
   cluster_id: :undefined,
   consumer_tag: "amq.ctag-JMkBETeHLnWn96slUkGSIQ",
   content_encoding: :undefined,
   content_type: :undefined,
   correlation_id: :undefined,
   delivery_tag: 1,
   exchange: "",
   expiration: :undefined,
   headers: [],
   message_id: :undefined,
   persistent: false,
   priority: :undefined,
   redelivered: false,
   reply_to: :undefined,
   routing_key: "queue1",
   timestamp: :undefined,
   type: :undefined,
   user_id: :undefined
 }}
:ok
iex(6)> Process.alive? chan.pid
true
iex(7)> AMQP.Basic.ack(chan, 1)
:ok
iex(8)> Process.alive? chan.pid
true
iex(9)> AMQP.Basic.ack(chan, 1)
:ok
iex(10)> Process.alive? chan.pid
false
iex(11)> AMQP.Basic.ack(chan, 1)
** (exit) exited in: :gen_server.call(#PID<0.229.0>, {:call, {:"basic.ack", 1, false}, :none, #PID<0.213.0>}, 60000)
    ** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
    (stdlib) gen_server.erl:214: :gen_server.call/3
    (amqp) lib/amqp/basic.ex:104: AMQP.Basic.ack/3
iex(12)>
ono commented

Closing this as we haven't heard. If the issue still exists, feel free to reopen with the information requested above. Thanks.

Anyone found a solution? I'm currently having this problem :(