smallrye/smallrye-reactive-messaging

Producing AMQP connector mis-configures ActiveMQ if the consumer connects after the first emission.

Closed this issue · 6 comments

Given a clean instance of ActiveMQ, I've noticed that when a producer emits a message before the consumer has connected, the message will remain unsent, or "un routed" in Active MQ. Is this the intended behaviour?

# The producer
mp.messaging.outgoing.tenant.address=tenant
mp.messaging.outgoing.tenant.connector=smallrye-amqp
mp.messaging.outgoing.tenant.host=${JMS_HOST_NAME:localhost}
mp.messaging.outgoing.tenant.port=${JMS_AMQP_PORT:17501}
mp.messaging.outgoing.tenant.durable=true
mp.messaging.outgoing.tenant.containerId=tenant-service
mp.messaging.outgoing.tenant.broadcast=false

# The consumer
mp.messaging.incoming.tenant.connector=smallrye-amqp
mp.messaging.incoming.tenant.host=${JMS_HOST_NAME:localhost}
mp.messaging.incoming.tenant.port=${JMS_AMQP_PORT:17501}
mp.messaging.incoming.tenant.durable=true
mp.messaging.incoming.tenant.containerId=identity-service
mp.messaging.incoming.tenant.broadcast=false

I have two other services that successfully receive the same message sent at the same time on different channels (yes, a poor man's broadcast). They attempt to connect before my producer attempts to connect, and will always receive the message. The configurations only differ by channel name and the (redundant) address.

When I compare the addresses in ActiveMQ, I see that the above address has the type 'MULTICAST', whereas the working addresses have 'ANYCAST'. Also, the binding name is not the channel name as for the others, but identity-service.auto-0 - seemingly derived from the consumers container id.

I thought that having durable set would mean the message would be delivered when the consumer connects, but seems incorrect. I can confirm that messages sent later are correctly delivered/routed, so the communication isn't broken. When I add a sufficient sleep before this initial message, then the message will be delivered and the ActiveMQ address is like the others.

My questions are:

Is this intended behaviour?

Is there are non-obvious-but-sufficient way to wait until the consumers are ready/readied before sending that first message?

I have found a work-around for my problem, which only illustrates a problem with the auto-magic configuration provided by the smallrye-amqp connector. Basically, if I pre-configure my ActiveMQ server, then it will work as expected and desired: if my producer sends before the consumer is up-and-running, then the consumer will receive it when available.

To reproduce the failing behaviour, where an emitted message is never delivered:

  1. Clone my reproducer https://github.com/Adelrisk/reproducer_smallrye-reactive-messaging_567
  2. cd reproducer_smallrye-reactive-messaging_567
  3. Start artemis activemq: docker run -it --rm -p 9876:5672 -e ARTEMIS_USERNAME=quarkus -e ARTEMIS_PASSWORD=quarkus vromero/activemq-artemis:2.13.0-alpine
  4. Wait for the successful emission of a message
    1. cd producer
    2. /mvnw compile quarkus:dev
    3. Wait for the line INFO [org.acm.MessageProducer] (vert.x-eventloop-thread-0) Confirmed message: my_onstart_message
  5. In a new console, start the consumer and wait forever for the message
    1. cd consumer
    2. /mvnw compile quarkus:dev
    3. Wait forever. The message is never delivered.

The workaround:

  1. Clone my reproducer https://github.com/Adelrisk/reproducer_smallrye-reactive-messaging_567
  2. cd reproducer_smallrye-reactive-messaging_567
  3. Start artemis activemq: docker run -it --rm -p 9876:5672 -p 8161:8161 -e ARTEMIS_USERNAME=quarkus -e ARTEMIS_PASSWORD=quarkus -v "$(pwd)/artemis_override:/var/lib/artemis/etc-override" vromero/activemq-artemis:2.13.0-alpine
    1. It should print Merging input with '/var/lib/artemis/etc-override/broker-00.xml'. If not, the directory with the configuration was not mounted.
  4. Wait for the successful emission of a message
    1. cd producer
    2. /mvnw compile quarkus:dev
    3. Wait for the line INFO [org.acm.MessageProducer] (vert.x-eventloop-thread-0) Confirmed message: my_onstart_message
  5. In a new console, start the consumer and receive the message
    1. cd consumer
    2. /mvnw compile quarkus:dev
    3. Wait for the line INFO [org.acm.MessageConsumer] (vert.x-eventloop-thread-0) Received message: my_onstart_message

It is generally recommended to declare the queue and avoid the "dumb" auto-creation.

Such a recommendation makes sense, once you are confronted with the leaky abstraction. I haven't read it anywhere else yet.

That being said, do you know what needs to be done to avoid the issue?

I assume you mean changes to the AMQP connector code. No. This is too much new territory for me, and I have not yet worked out how much heavy lifting is occurring. I also have questions about the dumb configuration, such as why does contain multicast, when so much of smallrye reactive-messaging is anycast.

It is the broker auto creating things here rather than the AMQP connector, so essentially the behaviour is mostly all broker side configuration in this case.

The Artemis brokers default configuration configures auto created addresses to be treated as Multicast, which is what happens when you send initially, if it doesn't exist. You can change this configuration default to treat auto created addresses as anycast instead by default if you like. E.g perhaps see "Auto Address Creation" at http://activemq.apache.org/components/artemis/documentation/latest/address-model.html. Or if you know you need particular queues, you can pre configure them.

Note that the 'durable' consumer is effectively just a durable topic subscription, so in that case you would actually need multicast addresses to facilitate having different durable subscriptions. The broker creates an implicit backing queue in those cases to serve as the subscription, one which encodes the connection container-id (used as the ClientID) and the receiver link name (the subscription name), which must both be provided the same in future uses to reuse the same subscription over time.

If you dont want different subscriptions, then indeed you likely want a regular 'not durable' consumer, and a queue, which you can either pre create or change the brokers address settings to ensure thats what it creates.

I gratefully appreciate these insightful and enabling statements. I feel like I should be able to grapple the technology better. Also, a big thanks that there wasn't a single hint or tone of "it works as expected" or "it works as expected". It dawned on me, I must be like a developer whose experience with databases was constrained to the code side of an ORM, and someone is politely explaining the existence of SQL.

I'm trying to work out in my head if there is some sort of recommendation or lesson to be learned out of this situation. It can't be denied that the (amqp) connector wonderfully encapsulates much of the needs for asynchronous messaging. But I obviously had some trouble breaking out of the provided leaky abstraction into the appropriate topics, and couldn't tell which tree I should be barking up.

That won't stop me from closing this. No bug, just a developer overwhelmed when it came to using a great technology for enterprise purposes.

@Adelrisk If there are improvements to the documentation that can help with this understanding we'd love for you to provide a PR for it!