sroze/messenger-enqueue-transport

It's not possible to pass consumer specific options from DSN

maks-rafalko opened this issue · 6 comments

We use Kafka with https://github.com/php-enqueue/rdkafka transport and it's impossible to set a Consumer-specific options, in particular group.id.

There is only global group.id setting that adds the group for all consumers rather than on per-consumer basis.

Class QueueInteropTransport creates a consumer here:

private function getConsumer(): Consumer
{
$context = $this->contextManager->context();
$destination = $this->getDestination(null);
$queue = $context->createQueue($destination['queue']);
return $context->createConsumer($queue);
}

And there is no way to send any options/settings except the Destination class, which is a RdKafkaTopic in our case.

What we would like to be able to do is to set settings that would be passed to the consumer constructor

https://github.com/php-enqueue/rdkafka/blob/2449f06c892656d598b1c8ab8ceb4a4ac848ab6c/RdKafkaContext.php#L114

by specifing transport-specific options in messenger.yaml

batch:
     dsn: enqueue://default?queue[name]=name&group.id=custom-consumer-group

Any hints how to do it with all these tiers layers of abstractions?

Current workaround would be creating another enqueue entry with another Kafka configuration, and then selecting it by dsn, like: enqueue://another.

Yes, there are definitely many workarounds (we have overridden RdFakfaContext for this case). But let's think about what architecture changes needs to be done to allow passing consumer specific options.

Currently, interop interfaces does not allow to do it. E.g. public function createConsumer(Destination $destination): Consumer; - we can pass only destination which is RdKafkaTopic. No settings for consumer itself.

@maks-rafalko we use the folowing enqueue config and works for multiple envs and multiple settings within global / topic setting for Kafka:

            dsn: 'kafka://%env(resolve:ENQUEUE_KAFKA_DSN_BROKER)%'
            global:
                metadata.broker.list: '%env(resolve:ENQUEUE_KAFKA_DSN_BROKER)%'
                group.id: app-id
                enable.auto.offset.store: 'false'
            topic:
                auto.offset.reset: beginning

Hm... If you want something "transport" specific, the best way to pass variables is to use options.

framework:
    messenger:
        transports:
            foobar: 
                dsn: '....'
                options: { group.id: custom-consumer-group }

These options is passed to the TransportFactory. We probably need some changes to that factory so it all will work.

the best way to pass variables is to use options.

DSN options are merged with options, so it does not matter what to use here (though options looks better).

[$contextManager, $dsnOptions] = $this->parseDsn($dsn);
$options = array_merge($dsnOptions, $options);
return new QueueInteropTransport(
$serializer ?? $this->serializer,
$contextManager,
$options,
$this->debug
);
}

Current issues:

  • options are validated, so no custom value can be passed there

$resolver->setDefaults(array(
'transport_name' => null,
'receiveTimeout' => null,
'deliveryDelay' => null,
'delayStrategy' => RabbitMqDelayPluginDelayStrategy::class,
'priority' => null,
'timeToLive' => null,
'topic' => array('name' => 'messages'),
'queue' => array('name' => 'messages'),
));
$resolver->setAllowedTypes('transport_name', array('null', 'string'));
$resolver->setAllowedTypes('receiveTimeout', array('null', 'int'));
$resolver->setAllowedTypes('deliveryDelay', array('null', 'int'));
$resolver->setAllowedTypes('priority', array('null', 'int'));
$resolver->setAllowedTypes('timeToLive', array('null', 'int'));
$resolver->setAllowedTypes('delayStrategy', array('null', 'string'));
$resolver->setAllowedValues('delayStrategy', array(
null,
RabbitMqDelayPluginDelayStrategy::class,
RabbitMqDlxDelayStrategy::class,
)
);
$resolver->setNormalizer('delayStrategy', function (Options $options, $value) {
return null !== $value ? new $value() : null;
});

  • options are not passed to consumer factory, here

return $context->createConsumer($queue);

Could you make a PR for this?

Maybe adding an option key with is called consumer_options?

Im not sure what the best solution is at the moment. But come with a suggestion.