Farfetch/kafkaflow

[Feature Request]: Set different authentication mechanisms on consumers and producers directly

Opened this issue · 1 comments

Is your request related to a problem you have?

We use eventhub with a kafka connector using a different authentication mechanism on the consumer side, and use a kafka producer towards confluent kafka on the other side.

Describe the solution you'd like

Make it possible to set authentication directly on consumer and producer to overwrite the default overall authentication.

Are you able to help bring it to life and contribute with a Pull Request?

Yes

Additional context

No response

Hi @MartinCarpMBC ,

You should be able to override the Producer and Consumer auth settings using the WithProducerConfig and WithConsumerConfig methods when configuring the producers and consumers.

example:

services.AddKafka(
    kafka => kafka
        .AddCluster(
            cluster => cluster
                .WithBrokers(new[] { "localhost:9092" })
                .CreateTopicIfNotExists(topicName, 6, 1)
                .AddProducer(
                    producerName,
                    producer => producer
                        .DefaultTopic(topicName)
                        .WithProducerConfig(new ProducerConfig
                        {
                            SaslUsername = "user",
                            SaslPassword = "pass",
                            SaslMechanism = SaslMechanism.Plain,
                            SecurityProtocol = SecurityProtocol.Plaintext,
                            EnableSslCertificateVerification = true
                        })
                        .AddMiddlewares(m => m.AddSerializer<ProtobufNetSerializer>())
                )
                .AddConsumer(
                    consumer => consumer
                        .Topic(topicName)
                        .WithGroupId("consumer-group-1")
                        .WithConsumerConfig(new ConsumerConfig
                        {
                            SaslUsername = "otheruser",
                            SaslPassword = "otherpass",
                            SaslMechanism = SaslMechanism.Plain,
                            SecurityProtocol = SecurityProtocol.Plaintext,
                            EnableSslCertificateVerification = true
                        })
                        .AddMiddlewares(
                            middlewares => middlewares
                                .AddDeserializer<ProtobufNetDeserializer>()
                        ))));