[ActiveMQ][STOMP] Every second message skipped
RafaelKr opened this issue · 1 comments
I have an ActiveMQ setup where messages from an external system are coming in and should be consumed with the Symfony messenger.
This basically works, which means some messages are handled. The issue is it skips every second message.
This is a long issue including the findings from my debugging session. I split it into these 3 sections:
- Which code paths are taken
- Visualization of what happens on the ActiveMQ side
- My current workaround (I would really appreciate if someone with deeper knowledge could have a look on this and decide if this is a viable general fix)
Which code paths are taken
I found out the following is happening when starting the Symfony messenger:consume
command:
- The symfony Messenger Worker calls the
get
method on the receiver. The receiver is an instance ofEnqueue\MessengerAdapter\QueueInteropTransport
. - Inside the get method the
getConsumer
method is called which leads to a newEnqueue\Stomp\StompConsumer
instance:
messenger-enqueue-transport/QueueInteropTransport.php
Lines 276 to 283 in 83c30ed
// Enqueue\Stomp\StompContext
// https://github.com/php-enqueue/stomp/blob/25c8155a7b7e86f57e22ab0869eb7725888e095a/StompContext.php#L178-L185
public function createConsumer(Destination $destination): Consumer
{
InvalidDestinationException::assertDestinationInstanceOf($destination, StompDestination::class);
$this->transient = false;
return new StompConsumer($this->getStomp(), $destination);
}
- On this new
StompConsumer
instance thereceive
method is called which itself calls thesubscribe
method.
When$this->stomp->sendFrame($frame);
inside thesubscribe
method is called it first will run aCONNECT
command and aSUBSCRIBE
command afterwards. After theSUBSCRIBE
I can see a new consumer client inside ActiveMQ. - Now it will receive a message and handle it. After handling it the Symfony Worker runs
$receiver->ack()
. This creates a NEWStompConsumer
instance.
messenger-enqueue-transport/QueueInteropTransport.php
Lines 106 to 111 in 83c30ed
On this newStompConsumer
instance theacknowledge
method is called. It will send anACK
command to ActiveMQ. - Steps 1-6 are repeated. The only difference is that the
CONNECT
in Step 5 is not sent anymore, only the first time.
Visualization of what happens on the ActiveMQ side
- Initial State. We have 9 messages and 0 consumers.
CONNECT
andSUBSCRIBE
commands are sent. AfterSUBSCRIBE
we see a new consumer in ActiveMQ.
Note that the consumer already has one enqueued message.
- This first message is handled, afterwards an
ACK
command is sent for it. This removes the first message from the queue.
Also the consumer now shows that two messages are enqueued and only one message is dequeued. So ActiveMQ sees the second message was received but not acknowledged (it's still visible in the queue).
SUBSCRIBE
command is sent. AfterSUBSCRIBE
we again see a new consumer in ActiveMQ.
Note the previous consumer still has two enqueues but only one dequeue.
ACK
command is sent. Afterwards we see the third message was acknowledged. The second message was never handled.
The consumer overview now also shows two enqueued and one dequeued message for the new consumer.
- Steps 5 and 6 repeat until we arrive at the following final state
- The Message worker will continue creating new consumers but never receive any further messages. Also if I create new messages on the queue they aren't received.
As you can see every even message was acknowledged and every odd message is skipped. This leads to my assumption that ActiveMQ already "pushes" the next message on every ACK
but due to always creating a new StompConsumer
it's never handled and "forgotten" in an StompConsumer
instance which was only created for an ACK
command.
Also when exiting the Symfony message:consume
command the consumers are still shown in the ActiveMQ consumer overview. When restarting the message:consume
command it also won't receive any messages because they are still reserved for the old consumers.
The ActiveMQ-Server is running on a remote machine. Disconnecting the network of my local machine removes the consumers at ActiveMQ and when now restarting the message:consume
command it will once start again consuming every second message.
Edit: Maybe this is also something that should be looked into. The StompClient probably should disconnect when messenger:consume
is exited.
Workaround
My current workaround is to cache the StompConsumer
instance in the QueueInteropTransport
instance.
class WorkaroundQueueInteropTransport implements TransportInterface
{
// [...]
private $consumer;
// [...]
private function getConsumer(): Consumer
{
if ($this->consumer) {
return $this->consumer;
}
$context = $this->contextManager->context();
$destination = $this->getDestination(null);
$queue = $context->createQueue($destination['queue']);
$this->consumer = $context->createConsumer($queue);
return $this->consumer;
}
}
I'm currently not 100% sure if this is a viable general fix because I don't know if we need to create a NEW consumer on every getConsumer
call. The only side effect of creating a new consumer I could find is that it's setting $this->transient = false;
on the StompContext
. I don't have enough knowledge of the Symfony messenger and this project to know if having a cached consumer instance breaks something else. Currently it seems to work for me.
I would really appreciate if someone with deeper knowledge could have a look on all of this.
@keulinho and @sroze you implemented the getConsumer
method in the first place. Can you tell if this is a viable fix?
hello, any news regarding this issue?