Can't publish a message into the exlusive queue
Relrin opened this issue · 1 comments
Relrin commented
Hello,
I'm writing tests for my own small package, but stucked with an issues, when Elixir can't resolve the appropriate function:
18:01:33.475 [error] Process #PID<0.256.0> raised an exception
** (FunctionClauseError) no function clause matching in AMQP.Basic.publish/5
(amqp) lib/amqp/basic.ex:51: AMQP.Basic.publish([connection: %AMQP.Connection{pid: #PID<0.238.0>}, config: [connection_timeout: 10000, username: "user", password: "password", host: "localhost", port: 5672, virtual_host: "/"]], "test.direct", "amq.gen-8CS8a9aGGBJzpCpt9LY2lw", "OK", [persistent: true])
test/worker_test.exs:111: SpotterWorkerTest.CustomWorker.consume/5
18:01:33.475 [error] Error in process <0.256.0> with exit value:
{function_clause,[{'Elixir.AMQP.Basic',publish,[[{connection,#{'__struct__' => 'Elixir.AMQP.Connection',pid => <0.238.0>}},{config,[{connection_timeout,10000},{username,<<"user">>},{password,<<"password">>},{host,<<"localhost">>},{port,5672},{virtual_host,<<"/">>}]}],<<"test.direct">>,<<"amq.gen-8CS8a9aGGBJzpCpt9LY2lw">>,<<"OK">>,[{persistent,true}]],[{file,"lib/amqp/basic.ex"},{line,51}]},{'Elixir.SpotterWorkerTest.CustomWorker',consume,5,[{file,"test/worker_test.exs"},{line,111}]}]}
1) test CustomWorker forwards message to the next queue (SpotterWorkerTest)
test/worker_test.exs:135
** (MatchError) no match of right hand side value: {:empty, %{cluster_id: ""}}
code: {:ok, payload, _} = AMQP.Basic.get(channel, queue[:queue])
stacktrace:
test/worker_test.exs:145: (test)
So, the code that I'm using for testing purposes is pretty straitforward:
defmodule MyTest do
use ExUnit.Case
use AMQP
@generic_exchange "test.direct"
@generic_queue_request "worker_test_queue_request"
@generic_queue_forward "worker_test_queue_forward"
@custom_amqp_opts [
username: "user",
password: "password",
host: "localhost",
port: 5672,
virtual_host: "/"
]
defmodule CustomWorker do
use Spotter.Worker
@exchange "test.direct"
@queue_request "worker_test_queue_request"
@queue_forward "worker_test_queue_forward"
def configure(connection, _config) do
{:ok, channel} = AMQP.Channel.open(connection)
:ok = AMQP.Exchange.direct(channel, @exchange, durable: true)
# An initial point where the worker do required stuff
{:ok, _} = AMQP.Queue.declare(channel, @queue_request, durable: true)
:ok = AMQP.Queue.bind(channel, @queue_request, @exchange, routing_key: @queue_request)
# Queue for a valid messages
{:ok, _} = AMQP.Queue.declare(channel, @queue_forward, durable: true)
:ok = AMQP.Queue.bind(channel, @queue_forward, @exchange, routing_key: @queue_forward)
:ok = AMQP.Basic.qos(channel, prefetch_count: 1)
{:ok, _} = AMQP.Basic.consume(channel, @queue_request)
{:ok, :done}
end
# Handle the trapped exit call
def handle_info({:EXIT, _from, reason}, state) do
{:stop, reason, state}
end
# Confirmation sent by the broker after registering this process as a consumer
def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, channel) do
{:noreply, channel}
end
# Sent by the broker when the consumer is unexpectedly cancelled
def handle_info({:basic_cancel, %{consumer_tag: _consumer_tag}}, channel) do
{:stop, :normal, channel}
end
# Confirmation sent by the broker to the consumer process after a Basic.cancel
def handle_info({:basic_cancel_ok, %{consumer_tag: _consumer_tag}}, channel) do
{:noreply, channel}
end
# Invoked when a message successfully consumed
def handle_info({:basic_deliver, payload, %{delivery_tag: tag, reply_to: reply_to, headers: headers}}, channel) do
spawn fn -> consume(channel, tag, reply_to, headers, payload) end
{:noreply, channel}
end
# Processing a message
defp consume(channel, tag, reply_to, headers, payload) do
# an issue in the following line
:ok = AMQP.Basic.publish(channel, @exchange, reply_to, "OK", [persistent: true])
AMQP.Basic.ack(channel, tag)
end
end
setup_all do
{:ok, pid} = CustomWorker.start_link(@custom_amqp_opts)
{:ok, [worker: pid]}
end
def create_client_connection() do
AMQP.Connection.open(@custom_amqp_opts)
end
def create_response_queue(connection) do
{:ok, channel} = AMQP.Channel.open(connection)
:ok = AMQP.Exchange.direct(channel, @generic_exchange, passive: true)
{:ok, queue} = AMQP.Queue.declare(channel, "", exclusive: true, durable: true, auto_delete: false)
:ok = AMQP.Queue.bind(channel, queue[:queue], @generic_exchange, routing_key: queue[:queue])
{:ok, channel, queue}
end
test "CustomWorker forwards message to the next queue", _state do
{:ok, connection} = create_client_connection()
{:ok, channel, queue} = create_response_queue(connection)
:ok = AMQP.Basic.publish(channel, @generic_exchange, @generic_queue_request, "VALID_DATA",
persistent: true,
reply_to: queue[:queue],
headers: [{:path, "api.matchmaking.search"},
{:permissions, "get;post"}, ]
)
{:ok, payload, _} = AMQP.Basic.get(channel, queue[:queue])
assert payload == "OK"
AMQP.Queue.delete(channel, queue)
AMQP.Connection.close(connection)
end
end
Any ideas why it happens and how can I fix the following issue? I'd looked in the basic.ex
file for publish
args and implemetation, but I'm presumming that everything is fine.
Relrin commented
Okay, I figured out what the issue was here:
- You need to provide a used channel to a GenServer state in init/1, so that it will possible to re-use it. Otherwise create it manually when it have a available connection to AMQP. For example, it can be looks like this:
def init(opts) do
Process.flag(:trap_exit, true)
# Some default setting for a connection
config = @defaults
|> Keyword.merge(opts)
|> Confex.Resolver.resolve!
{:ok, connection} = open_connection(opts) # Create a new connection for AMQP here
Process.monitor(connection.pid)
# Let to a developer to do stuff here and use the result of it later
# For example, it could be a couple of channels for you purposes
{:ok, meta} = configure(connection, config)
{:ok, [connection: connection, config: config, meta: meta]}
end
- After it describe the handlers for your own cases and reuse the data from a state.