pma/amqp

Can't publish a message into the exlusive queue

Relrin opened this issue · 1 comments

Hello,

I'm writing tests for my own small package, but stucked with an issues, when Elixir can't resolve the appropriate function:

18:01:33.475 [error] Process #PID<0.256.0> raised an exception
** (FunctionClauseError) no function clause matching in AMQP.Basic.publish/5
    (amqp) lib/amqp/basic.ex:51: AMQP.Basic.publish([connection: %AMQP.Connection{pid: #PID<0.238.0>}, config: [connection_timeout: 10000, username: "user", password: "password", host: "localhost", port: 5672, virtual_host: "/"]], "test.direct", "amq.gen-8CS8a9aGGBJzpCpt9LY2lw", "OK", [persistent: true])
    test/worker_test.exs:111: SpotterWorkerTest.CustomWorker.consume/5
18:01:33.475 [error] Error in process <0.256.0> with exit value:
{function_clause,[{'Elixir.AMQP.Basic',publish,[[{connection,#{'__struct__' => 'Elixir.AMQP.Connection',pid => <0.238.0>}},{config,[{connection_timeout,10000},{username,<<"user">>},{password,<<"password">>},{host,<<"localhost">>},{port,5672},{virtual_host,<<"/">>}]}],<<"test.direct">>,<<"amq.gen-8CS8a9aGGBJzpCpt9LY2lw">>,<<"OK">>,[{persistent,true}]],[{file,"lib/amqp/basic.ex"},{line,51}]},{'Elixir.SpotterWorkerTest.CustomWorker',consume,5,[{file,"test/worker_test.exs"},{line,111}]}]}


  1) test CustomWorker forwards message to the next queue (SpotterWorkerTest)
     test/worker_test.exs:135
     ** (MatchError) no match of right hand side value: {:empty, %{cluster_id: ""}}
     code: {:ok, payload, _} = AMQP.Basic.get(channel, queue[:queue])
     stacktrace:
       test/worker_test.exs:145: (test)

So, the code that I'm using for testing purposes is pretty straitforward:

defmodule MyTest do
  use ExUnit.Case
  use AMQP

  @generic_exchange "test.direct"
  @generic_queue_request "worker_test_queue_request"
  @generic_queue_forward "worker_test_queue_forward"

  @custom_amqp_opts [
    username: "user",
    password: "password",
    host: "localhost",
    port: 5672,
    virtual_host: "/"
  ]

  defmodule CustomWorker do
    use Spotter.Worker

    @exchange "test.direct"
    @queue_request "worker_test_queue_request"
    @queue_forward "worker_test_queue_forward"

    def configure(connection, _config) do
      {:ok, channel} = AMQP.Channel.open(connection)
      :ok = AMQP.Exchange.direct(channel, @exchange, durable: true)

      # An initial point where the worker do required stuff
      {:ok, _} = AMQP.Queue.declare(channel, @queue_request, durable: true)
      :ok = AMQP.Queue.bind(channel, @queue_request, @exchange, routing_key: @queue_request)

      # Queue for a valid messages
      {:ok, _} = AMQP.Queue.declare(channel, @queue_forward, durable: true)
      :ok = AMQP.Queue.bind(channel, @queue_forward, @exchange, routing_key: @queue_forward)

      :ok = AMQP.Basic.qos(channel, prefetch_count: 1)
      {:ok, _} = AMQP.Basic.consume(channel, @queue_request)

      {:ok, :done}
    end

    # Handle the trapped exit call
    def handle_info({:EXIT, _from, reason}, state) do
      {:stop, reason, state}
    end

    # Confirmation sent by the broker after registering this process as a consumer
    def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, channel) do
      {:noreply, channel}
    end

    # Sent by the broker when the consumer is unexpectedly cancelled
    def handle_info({:basic_cancel, %{consumer_tag: _consumer_tag}}, channel) do
      {:stop, :normal, channel}
    end

    # Confirmation sent by the broker to the consumer process after a Basic.cancel
    def handle_info({:basic_cancel_ok, %{consumer_tag: _consumer_tag}}, channel) do
      {:noreply, channel}
    end

    # Invoked when a message successfully consumed
    def handle_info({:basic_deliver, payload, %{delivery_tag: tag, reply_to: reply_to, headers: headers}}, channel) do
      spawn fn -> consume(channel, tag, reply_to, headers, payload) end
      {:noreply, channel}
    end

    # Processing a message
    defp consume(channel, tag, reply_to, headers, payload) do
      # an issue in the following line 
      :ok = AMQP.Basic.publish(channel, @exchange, reply_to, "OK", [persistent: true]) 
      AMQP.Basic.ack(channel, tag)
    end
  end

  setup_all do
    {:ok, pid} = CustomWorker.start_link(@custom_amqp_opts)
    {:ok, [worker: pid]}
  end

  def create_client_connection() do
    AMQP.Connection.open(@custom_amqp_opts)
  end

  def create_response_queue(connection) do
    {:ok, channel} = AMQP.Channel.open(connection)
    :ok = AMQP.Exchange.direct(channel, @generic_exchange, passive: true)

    {:ok, queue} = AMQP.Queue.declare(channel, "", exclusive: true, durable: true, auto_delete: false)
    :ok = AMQP.Queue.bind(channel, queue[:queue], @generic_exchange, routing_key: queue[:queue])

    {:ok, channel, queue}
  end

  test "CustomWorker forwards message to the next queue", _state do
    {:ok, connection} = create_client_connection()
    {:ok, channel, queue} = create_response_queue(connection)

    :ok = AMQP.Basic.publish(channel, @generic_exchange, @generic_queue_request, "VALID_DATA",
                             persistent: true,
                             reply_to: queue[:queue],
                             headers: [{:path, "api.matchmaking.search"},
                                       {:permissions, "get;post"}, ]
    )
    {:ok, payload, _} = AMQP.Basic.get(channel, queue[:queue])
    assert payload == "OK"

    AMQP.Queue.delete(channel, queue)
    AMQP.Connection.close(connection)
  end
end

Any ideas why it happens and how can I fix the following issue? I'd looked in the basic.ex file for publish args and implemetation, but I'm presumming that everything is fine.

Okay, I figured out what the issue was here:

  1. You need to provide a used channel to a GenServer state in init/1, so that it will possible to re-use it. Otherwise create it manually when it have a available connection to AMQP. For example, it can be looks like this:
def init(opts) do
    Process.flag(:trap_exit, true)

    # Some default setting for a connection
    config = @defaults
        |> Keyword.merge(opts)
        |> Confex.Resolver.resolve!

    {:ok, connection} = open_connection(opts)  # Create a new connection for AMQP here
    Process.monitor(connection.pid)            

    # Let to a developer to do stuff here and use the result of it later
    # For example, it could be a couple of channels for you purposes
    {:ok, meta} = configure(connection, config)  
    {:ok, [connection: connection, config: config, meta: meta]}
end
  1. After it describe the handlers for your own cases and reuse the data from a state.