sroze/messenger-enqueue-transport

Messages sent twice using Messenger component when using multiple routes

Closed this issue · 4 comments

Initially thought it's an issue with Messenger itself.

I'm using enqueue with Kafka and messenger adapter to send messages to external services. Here's my messenger.yaml:

framework:
  messenger:
    transports:
    # Uncomment the following line to enable a transport named "amqp"
      amqp: '%env(MESSENGER_TRANSPORT_DSN)%'
      view_events: '%env(MESSENGER_TRANSPORT_DSN)%?topic[name]=view_events&queue[name]=view_events'

    routing:
      # Route your messages to the transports
      'App\Message\DisplayNotification':
        senders: [ view_events ]
        send_and_handle: true
      '*':
        senders: [ amqp ]
        send_and_handle: true

Notice: send_and_handle are only for debugging atm.

Related packages (and Symfony 4.2):

enqueue/amqp-tools                0.9.4    Message Queue Amqp Tools
enqueue/dsn                       0.9.2    Parse DSN
enqueue/enqueue                   0.9.6    Message Queue Library
enqueue/enqueue-bundle            0.9.3    Message Queue Bundle
enqueue/messenger-adapter         0.2.0    Enqueue adapter for Symfony Messenger component
enqueue/null                      0.9.2    Enqueue Null transport
enqueue/rdkafka                   0.9.2    Message Queue Kafka Transport

And enqueue.yaml:

enqueue:
    default:
        transport:
            dsn: "kafka:"
            global:
                metadata.broker.list: '%env(KAFKA_SERVERS)%'
            topic:
                auto.offset.reset: beginning
            commit_async: true
        client: ~

I'm emitting messages using:

public function addEvent(Request $request, string $hash, $selectedEntry): void
{
    $event = DisplayNotification::createFromRequest($request, $hash, $selectedEntry);
    $this->bus->dispatch($event);
}

This causes messages to be received twice. Once for amqp and once for view_events transports. When looking for the reason why this is happening I added send_and_handle settings and realized:

  1. Second message wraps the first one.
  2. I cannot use normal handlers, because Envelope returned from SendMessageMiddleware contains RdKafkaMessage instead of expected DisplayNotification.

Looking through SenderInterface I noticed that it's implementation QueueInteropTransport expects to receive and return an Envelope. However, envelope returned is a new instance of Envelope wrapping previous Envelope as message. I believe it should be the previous instance instead?

send method exerpt below:

public function send(Envelope $message): Envelope
{
   ...
   $encodedMessage = $this->serializer->encode($message);

   $originalMessage = $message;
   $message = $context->createMessage(
   ...

   return $message; // Should be $originalMessage?

When $originalMessage is returned, Messenger component behaves as expected: send message only once and handlers receive expected objects.

I'll create pull request in a moment, but do you see any issues with this change? Will it break anything in enqueue itself?

Looks like it's fixed on master. But not yet released (db82dd6).

Actually, messages is also sent to another route when '*' is matched and route has a different sender. I don't know if it should be considered a Messenger bug, what do you guys think?

Is it better now that the fix has been released? Not sure about the * routing part :)

@weaverryan Our testing when using 0.9 shown that the issue is no more :)