sroze/messenger-enqueue-transport

How to set serializer on context

Opened this issue · 5 comments

Hey, I may be missing something really obvious so apologies in advance, but I am little confused on how to set a custom serializer. I am trying to work with a Kafka context and an Avro serializer.

https://github.com/sroze/messenger-enqueue-transport/blob/master/QueueInteropTransportFactory.php#L83
The above line of code pulls back the default context which resolves based on DSN to the Kafka context. This by default sets the JSON serializer, the enqueue documentation states that I need to override this by calling setSerializer, how would I achieve this. I can set the serializer at the transport level but this does not get passed to this factory.

Potentially change the context name to avro and build my own service? But if so what would I configure?

@Steveb-p Thanks for the help, I have actually already tried this by setting the following, in my services.yaml

Enqueue\RdKafka\RdKafkaContext:
        calls:
            - setSerializer: ['@App\Messenger\DomainEventSerialiser']

Maybe I botched that, wrong config file or pointing to the wrong dependency?

@jakeBarwell check if you're refering to the correct RdKafkaContext instance (php bin/console debug:container RdKafkaContext).

@Steveb-p no, after checking the enqueue.transport.default.context, it was pointing to the interop context which I couldn't use to setSerializer. I am currently getting around this by overriding the enqueue.transport.default.connection_factory and createContext methods.

Just an FYI I came across this as was having the same issue, this was in my services.yaml

    App\Infrastructure\Messenger\AvroSerializer:

    Enqueue\RdKafka\RdKafkaContext:
        arguments:
            - []
        calls:
            - setSerializer: ['@App\Infrastructure\Messenger\AvroSerializer']

There seems to be an issue with Symfony Messenger picking up this context

So I had to create a KafkaConnectionFactory class

<?php

namespace App\Infrastructure\Kafka;

use App\Infrastructure\Messenger\AvroSerializer;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Interop\Queue\Context;

class KafkaConnectionFactory extends RdKafkaConnectionFactory
{
    public function createContext(): Context
    {
        $context = parent::createContext();
        $context->setSerializer(new AvroSerializer());

        return $context;
    }
}

And in enqueue.yaml

            connection_factory_class: 'App\Infrastructure\Kafka\KafkaConnectionFactory'

And it works.
Would be great to do this in pure YAML if you know of a way?