Messages sent twice using Messenger component when using multiple routes
Closed this issue · 4 comments
Initially thought it's an issue with Messenger itself.
I'm using enqueue with Kafka and messenger adapter to send messages to external services. Here's my messenger.yaml:
framework:
messenger:
transports:
# Uncomment the following line to enable a transport named "amqp"
amqp: '%env(MESSENGER_TRANSPORT_DSN)%'
view_events: '%env(MESSENGER_TRANSPORT_DSN)%?topic[name]=view_events&queue[name]=view_events'
routing:
# Route your messages to the transports
'App\Message\DisplayNotification':
senders: [ view_events ]
send_and_handle: true
'*':
senders: [ amqp ]
send_and_handle: true
Notice: send_and_handle
are only for debugging atm.
Related packages (and Symfony 4.2):
enqueue/amqp-tools 0.9.4 Message Queue Amqp Tools
enqueue/dsn 0.9.2 Parse DSN
enqueue/enqueue 0.9.6 Message Queue Library
enqueue/enqueue-bundle 0.9.3 Message Queue Bundle
enqueue/messenger-adapter 0.2.0 Enqueue adapter for Symfony Messenger component
enqueue/null 0.9.2 Enqueue Null transport
enqueue/rdkafka 0.9.2 Message Queue Kafka Transport
And enqueue.yaml:
enqueue:
default:
transport:
dsn: "kafka:"
global:
metadata.broker.list: '%env(KAFKA_SERVERS)%'
topic:
auto.offset.reset: beginning
commit_async: true
client: ~
I'm emitting messages using:
public function addEvent(Request $request, string $hash, $selectedEntry): void
{
$event = DisplayNotification::createFromRequest($request, $hash, $selectedEntry);
$this->bus->dispatch($event);
}
This causes messages to be received twice. Once for amqp and once for view_events transports. When looking for the reason why this is happening I added send_and_handle
settings and realized:
- Second message wraps the first one.
- I cannot use normal handlers, because Envelope returned from
SendMessageMiddleware
containsRdKafkaMessage
instead of expectedDisplayNotification
.
Looking through SenderInterface
I noticed that it's implementation QueueInteropTransport
expects to receive and return an Envelope
. However, envelope returned is a new instance of Envelope wrapping previous Envelope as message. I believe it should be the previous instance instead?
send
method exerpt below:
public function send(Envelope $message): Envelope
{
...
$encodedMessage = $this->serializer->encode($message);
$originalMessage = $message;
$message = $context->createMessage(
...
return $message; // Should be $originalMessage?
When $originalMessage
is returned, Messenger component behaves as expected: send message only once and handlers receive expected objects.
I'll create pull request in a moment, but do you see any issues with this change? Will it break anything in enqueue itself?
Actually, messages is also sent to another route when '*'
is matched and route has a different sender. I don't know if it should be considered a Messenger bug, what do you guys think?
Is it better now that the fix has been released? Not sure about the *
routing part :)
@weaverryan Our testing when using 0.9 shown that the issue is no more :)