no process: the process is not alive or there's no process currently - TERMINATION!
Closed this issue · 4 comments
Hi,
I am trying to use the amqp library more or less based on the consumer example from Readme.md.
The problem is, that I need to use prefetch, which works, but I would like to use reject
with requeue
option together. Then there some weird stuff happens. I get an error like this and it looks like every is terminated.
Am I doing something in a way that I shouldn't? Or is this a bug?
So basically I start the consumer, then I get 2 messages. When I ack
one of them then next one is poped up, BUT randomly process is killed out of unknow reason? Same goes with rej
.
** (EXIT from #PID<0.378.0>) evaluator process exited with reason: exited in: :gen_server.call(#PID<0.398.0>, {:call, {:"basic.reject", 3, true}, :none, #PID<0.386.0>}, :infinity)
** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
14:58:06.040 [error] GenServer Rabbitmq.Consumer terminating
** (stop) exited in: :gen_server.call(#PID<0.398.0>, {:call, {:"basic.reject", 3, true}, :none, #PID<0.386.0>}, :infinity)
** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
(stdlib) gen_server.erl:214: :gen_server.call/3
(rabbitmq) lib/rabbitmq/consumer.ex:62: Rabbitmq.Consumer.handle_cast/2
(stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
(stdlib) gen_server.erl:686: :gen_server.handle_msg/6
(stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:"$gen_cast", {:rej, 3}}
State: %AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.389.0>}, pid: #PID<0.398.0>}
and my code is like
defmodule Rabbitmq.Consumer do
@moduledoc """
Module for listening on the RabbitMQ server.
## Params
- connection_string
- exchange
- queue
- queue_error
"""
use GenServer
use AMQP
require Logger
@exchange "test1"
@queue "queue1"
@queue_error "#{@queue}_error"
########
# APIS #
########
@doc """
Starts to listen to given RabbitMQ queue
## Examples
iex> {:ok, pid} = Rabbitmq.Listener.start_link
iex> Process.alive? pid
true
## TODO:
- Maybe the supervisor could send the configuration for queue, connection string,
exchange, queue_error and name of the module true the start link?
- Also the payload could be send true the start_link(imei |> DeviceDataProcessing.start?)
- Examples in the start_link/1 should probably not exist!
"""
def start_link do
GenServer.start_link(__MODULE__, [], name: __MODULE__)
end
def ack(tag) do
GenServer.cast(__MODULE__, {:ack, tag})
end
def rej(tag) do
GenServer.cast(__MODULE__, {:rej, tag})
end
#############
# CALLBACKS #
#############
def init(_opts) do
rabbitmq_connect()
end
def handle_cast({:ack, tag}, channel) do
Basic.ack channel, tag
{:noreply, channel}
end
def handle_cast({:rej, tag}, channel) do
Basic.reject channel, tag, requeue: true
{:noreply, channel}
end
# Confirmation sent by the broker after registering this process as a consumer
def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, chan) do
{:noreply, chan}
end
# Sent by the broker when the consumer is unexpectedly cancelled (such as after a queue deletion)
def handle_info({:basic_cancel, %{consumer_tag: _consumer_tag}}, chan) do
{:stop, :normal, chan}
end
# Confirmation sent by the broker to the consumer process after a Basic.cancel
def handle_info({:basic_cancel_ok, %{consumer_tag: _consumer_tag}}, chan) do
{:noreply, chan}
end
def handle_info({:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered}}, chan) do
spawn fn -> consume(chan, tag, redelivered, payload) end
{:noreply, chan}
end
# 2. Implement a callback to handle DOWN notifications from the system
# This callback should try to reconnect to the server
def handle_info({:DOWN, _, :process, _pid, _reason}, _) do
{:ok, chan} = rabbitmq_connect()
{:noreply, chan}
end
#####################
# PRIVATE FUNCTIONS #
#####################
defp consume(channel, tag, redelivered, payload) do
# TODO: Add Logic worker assigned to process the data from RabbitMQ queue
Logger.debug "#{inspect payload}"
rescue
# Requeue unless it's a redelivered message.
# This means we will retry consuming a message once in case of exception
# before we give up and have it moved to the error queue
#
# You might also want to catch :exit signal in production code.
# Make sure you call ack, nack or reject otherwise comsumer will stop
# receiving messages.
exception ->
Basic.reject channel, tag, requeue: not redelivered
Logger.error "[Rabbitmq.Listener] Error with payload: #{inspect payload},\nwith error: #{inspect exception}"
end
defp rabbitmq_connect do
Application.get_env(:rabbitmq, :connection_string)
|> Connection.open
|> connection_status
end
defp connection_status({:ok, conn}) do
Process.monitor(conn.pid)
{:ok, chan} = Channel.open(conn)
Basic.qos(chan, prefetch_count: 2)
Queue.declare(chan, @queue_error, durable: true)
Exchange.direct(chan, @exchange, durable: true)
Queue.bind(chan, @queue, @exchange)
{:ok, _consumer_tag} = Basic.consume(chan, @queue)
Logger.debug "[Rabbitmq.Listener] Connected to RabbitMQ..."
{:ok, chan}
end
defp connection_status({:error, reason}) do
Logger.warn "[Rabbitmq.Listener] Something is wrong, can't connect #{inspect reason}, trying to reconnect..."
:timer.sleep(10000)
rabbitmq_connect()
end
end
Hi,
Could you be acknowledging or rejecting the same message twice, or trying to ack using an invalid delivery_tag?
At least I was able to reproduce the same end result by trying to ack twice (the second fails because the delivery_tag is no longer valid and silently crashes the channel process).
Steps:
iex(1)> {:ok, conn} = AMQP.Connection.open
{:ok, %AMQP.Connection{pid: #PID<0.217.0>}}
iex(2)> {:ok, chan} = AMQP.Channel.open(conn)
{:ok,
%AMQP.Channel{conn: %AMQP.Connection{pid: #PID<0.217.0>}, pid: #PID<0.229.0>}}
iex(3)> {:ok, consumer_tag} = AMQP.Basic.consume(chan, "queue1")
{:ok, "amq.ctag-JMkBETeHLnWn96slUkGSIQ"}
iex(4)> flush
{:basic_consume_ok, %{consumer_tag: "amq.ctag-JMkBETeHLnWn96slUkGSIQ"}}
:ok
...
# Manually published a message to the queue1
...
iex(5)> flush
{:basic_deliver, "hello",
%{
app_id: :undefined,
cluster_id: :undefined,
consumer_tag: "amq.ctag-JMkBETeHLnWn96slUkGSIQ",
content_encoding: :undefined,
content_type: :undefined,
correlation_id: :undefined,
delivery_tag: 1,
exchange: "",
expiration: :undefined,
headers: [],
message_id: :undefined,
persistent: false,
priority: :undefined,
redelivered: false,
reply_to: :undefined,
routing_key: "queue1",
timestamp: :undefined,
type: :undefined,
user_id: :undefined
}}
:ok
iex(6)> Process.alive? chan.pid
true
iex(7)> AMQP.Basic.ack(chan, 1)
:ok
iex(8)> Process.alive? chan.pid
true
iex(9)> AMQP.Basic.ack(chan, 1)
:ok
iex(10)> Process.alive? chan.pid
false
iex(11)> AMQP.Basic.ack(chan, 1)
** (exit) exited in: :gen_server.call(#PID<0.229.0>, {:call, {:"basic.ack", 1, false}, :none, #PID<0.213.0>}, 60000)
** (EXIT) no process: the process is not alive or there's no process currently associated with the given name, possibly because its application isn't started
(stdlib) gen_server.erl:214: :gen_server.call/3
(amqp) lib/amqp/basic.ex:104: AMQP.Basic.ack/3
iex(12)>
Closing this as we haven't heard. If the issue still exists, feel free to reopen with the information requested above. Thanks.
Anyone found a solution? I'm currently having this problem :(