AlejandroRivera/embedded-rabbitmq

Exchange type topic throws exception

Closed this issue · 9 comments

When I use exchange type 'topic' while declaring exchange exception is thrown see below, but with type 'direct' everything works fine.

Exception:

[info]   java.io.IOException:
[info]   at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
[info]   at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
[info]   at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
[info]   at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:703)
[info]   at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:672)
[info]   at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:720)
[info]   at com.rabbitmq.client.impl.ChannelN.exchangeDeclare(ChannelN.java:61)
[info]   ...
[info]   Cause: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'test-exchange' in vhost '/': received 'topic' but current is 'direct', class-id=40, method-id=10)Reporter completed abruptly with an exception after receiving event: TestFailed(Ordinal(0, 17),java.io.IOException was thrown.

An example recipe

val TestRMQNodeName = "rabbit-test"
val TestRMQNodePort = "5672"
val DecodedExchangeName = "test-exchange"

val config: EmbeddedRabbitMqConfig = new EmbeddedRabbitMqConfig.Builder()
  .rabbitMqServerInitializationTimeoutInMillis(6000)
  .defaultRabbitMqCtlTimeoutInMillis(6000)
  .envVar(RabbitMqEnvVar.NODENAME, TestRMQNodeName)
  .envVar(RabbitMqEnvVar.NODE_PORT, TestRMQNodePort)
  .build()

val rabbitMq: EmbeddedRabbitMq = new EmbeddedRabbitMq(config)

rabbitMq.start()

val connFactory = new ConnectionFactory()
connFactory.setHost("localhost")
connFactory.setPort(5672)
connFactory.setVirtualHost("/")
connFactory.setUsername("guest")
connFactory.setPassword("guest")
val conn = connFactory.newConnection()
val channel = conn.createChannel()
channel.exchangeDeclare(DecodedExchangeName, "topic", true);

@noorul, i think the issue is specific to your setup.
As you can see, the error message says:

inequivalent arg 'type' for exchange 'test-exchange' in vhost '/': received 'topic' but current is 'direct', class-id=40, method-id=10)

They keywords here is inequivalent and received 'topic' but current is 'direct'.
This means that you're trying to re-declare an already-existing exchange which was declared previously with another "configuration".

RabbitMQ allows re-declarations to happen successfully IF the "configuration" matches what's already present.

The docs say:

Exchanges cannot be redeclared with different types. The client MUST not attempt to redeclare an existing exchange with a different type than used in the original Exchange.Declare method. Error code: not-allowed

That is what I suspected initially but unfortunately that is not the case. I could reproduce it by giving different names for different runs.

import io.arivera.oss.embedded.rabbitmq.EmbeddedRabbitMq;
import io.arivera.oss.embedded.rabbitmq.EmbeddedRabbitMqConfig;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.GetResponse;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;

public class Issue32 {

  private EmbeddedRabbitMq embeddedRabbitMq;
  private EmbeddedRabbitMqConfig.Builder builder;

  @Before
  public void setUp() {
    builder = new EmbeddedRabbitMqConfig.Builder();
    embeddedRabbitMq = new EmbeddedRabbitMq(builder.build());
    embeddedRabbitMq.start();
  }

  @After
  public void tearDown() {
    embeddedRabbitMq.stop();
  }

  @Test
  public void name() throws IOException, TimeoutException {
    ConnectionFactory connectionFactory = new ConnectionFactory();
    connectionFactory.setHost("localhost");
    connectionFactory.setPort(5672);
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setVirtualHost("/");

    Connection connection = connectionFactory.newConnection();
    Channel channel = null;
    try {
      channel = connection.createChannel();
      channel.exchangeDeclare("my-exchange", "topic", true, false, null);
      channel.queueDeclare("my-queue", true, false, false, null);
      channel.queueBind("my-queue", "my-exchange", "foo.*", null);

      String messageToSend = "hello world!";
      channel.basicPublish("my-exchange", "foo.bar", null, messageToSend.getBytes("utf-8"));

      GetResponse getResponse = channel.basicGet("my-queue", true);
      String messageReceived = new String(getResponse.getBody(), "utf-8");
      assertThat(messageReceived, equalTo(messageToSend));
      System.out.println("Message received: " + messageReceived);
    } finally {
      if (channel != null) { channel.close(); }
      connection.close();
    }
  }
}

gives me:

20:17:05.865 DEBUG       main i.a.o.e.r.E.P.rabbitmq-server - Process started.
20:17:06.848 INFO    Thread-2 i.a.o.e.r.E.P.rabbitmq-server - 
20:17:06.848 INFO    Thread-2 i.a.o.e.r.E.P.rabbitmq-server -               RabbitMQ 3.6.6. Copyright (C) 2007-2016 Pivotal Software, Inc.
20:17:06.848 INFO    Thread-2 i.a.o.e.r.E.P.rabbitmq-server -   ##  ##      Licensed under the MPL.  See http://www.rabbitmq.com/
20:17:06.848 INFO    Thread-2 i.a.o.e.r.E.P.rabbitmq-server -   ##  ##
20:17:06.848 INFO    Thread-2 i.a.o.e.r.E.P.rabbitmq-server -   ##########  Logs: /var/folders/5v/x81_dvcn11ddrkb2ybs56k_r0000gn/T/rabbitmq_server-3.6.6/var/log/rabbitmq/rabbit@rivera-mbp.log
20:17:06.848 INFO    Thread-2 i.a.o.e.r.E.P.rabbitmq-server -   ######  ##        /var/folders/5v/x81_dvcn11ddrkb2ybs56k_r0000gn/T/rabbitmq_server-3.6.6/var/log/rabbitmq/rabbit@rivera-mbp-sasl.log
20:17:06.848 INFO    Thread-2 i.a.o.e.r.E.P.rabbitmq-server -   ##########
20:17:06.848 INFO    Thread-2 i.a.o.e.r.E.P.rabbitmq-server -               Starting broker...
20:17:07.449 INFO    Thread-2 i.a.o.e.r.E.P.rabbitmq-server -  completed with 0 plugins.

Message received: hello world!

It looks like stop() leaves epmd daemon running.

$ ps -ef | grep -i [r]abbit
  501 85037     1   0  8:34PM ??         0:00.00 /var/folders/j6/4qlzsrl541q6nqyfhbkmfnvr0000gn/T/rabbitmq_server-3.6.6/erts-6.4/bin/../../erts-6.4/bin/epmd -daemon
$

If I kill it and run it is working.

stop() will invoke the command to shut down RabbitMQ and wait for completion of the command.
if something goes wrong and the JVM exits, the process might continue to run.
In some systems, this is a bigger problem than in others. For example, in Mac OS, the process always seems to be shut-down (perhaps because it's considered a child process of the JVM), but in Ubuntu, it doesn't.

See PR #28 for the same report and test case that proves this problem.
I've not been able to find a consistent/good solution to enforce the process to always be killed, but it seems to be a common issue with all libraries that initiate external processes (EmbeddedRedis, EmbeddedMongo, etc.)

If you do find a solution that works, a PR will be most welcomed

It looks like rabbitmq also persists metadata.

/var/folders/j6/4qlzsrl541q6nqyfhbkmfnvr0000gn/T/rabbitmq_server-3.6.6/var/lib/rabbitmq/mnesia/rabbit-test
$ ls -la 
total 184
drwxr-xr-x  20 nislam  staff    680 Apr  8 20:46 .
drwxr-xr-x   5 nislam  staff    170 Apr  8 20:46 ..
-rw-r--r--   1 nislam  staff    171 Apr  8 20:46 DECISION_TAB.LOG
-rw-r--r--   1 nislam  staff    718 Apr  8 20:46 LATEST.LOG
-rw-r--r--   1 nislam  staff     75 Apr  8 20:46 cluster_nodes.config
drwxr-xr-x   6 nislam  staff    204 Apr  8 20:46 msg_store_persistent
drwxr-xr-x   6 nislam  staff    204 Apr  8 20:46 msg_store_transient
-rw-r--r--   1 nislam  staff     37 Apr  8 20:46 nodes_running_at_shutdown
-rw-r--r--   1 nislam  staff   1335 Apr  6 12:41 rabbit_durable_exchange.DCD
-rw-r--r--   1 nislam  staff    591 Apr  8 20:34 rabbit_durable_exchange.DCL
-rw-r--r--   1 nislam  staff    285 Apr  8 20:46 rabbit_durable_queue.DCD
-rw-r--r--   1 nislam  staff    451 Apr  8 20:34 rabbit_durable_route.DCD
-rw-r--r--   1 nislam  staff    191 Apr  6 12:41 rabbit_runtime_parameters.DCD
-rw-r--r--   1 nislam  staff      4 Apr  8 20:46 rabbit_serial
-rw-r--r--   1 nislam  staff    241 Apr  6 12:41 rabbit_user.DCD
-rw-r--r--   1 nislam  staff    204 Apr  6 12:41 rabbit_user_permission.DCD
-rw-r--r--   1 nislam  staff    145 Apr  6 12:41 rabbit_vhost.DCD
-rw-r--r--   1 nislam  staff   5965 Apr  8 20:46 recovery.dets
-rw-r--r--   1 nislam  staff  24767 Apr  6 12:41 schema.DAT
-rw-r--r--   1 nislam  staff    283 Apr  6 12:32 schema_version

I think I can close this issue now. I never knew that rabbitmq actually persists metadata.

But do you think I should open a ticket for epmd daemon being running even after calling stop() ?

Oops! I did not read you previous comment regarding stop() before sending out my last comment. So you can ignore that. Thank you for your time.

No worries. Happy coding :)

One of the reason could be that you have not specified the consumer type to direct and by default, it takes topic