Farfetch/kafkaflow

[Bug Report]: Kafkaflow does not allow to have consumers with different consumer groups (shared base consumer config)

ArminShoeibi opened this issue · 0 comments

Prerequisites

  • I have searched issues to ensure it has not already been reported

Description

Hi there folks,

We have 3 consumers in our project and we want these consumers to have different consumer groups we are using a base consumer config like this:

      ConsumerConfig defaultConfig = new()
      {
          Acks = Confluent.Kafka.Acks.All,
          AllowAutoCreateTopics = false,
          AutoCommitIntervalMs = 5000,
          AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Latest,
          LogConnectionClose = false,
          ReconnectBackoffMs = 1000,
          ReconnectBackoffMaxMs = 6000
      };

then we pass this instance to the below method.

public IConsumerConfigurationBuilder WithConsumerConfig(ConsumerConfig config)

then the problem appears (related to reference types) in the Build method

this.consumerConfig ??= new ConsumerConfig();

our second and third consumers, use the first consumer config.
Should we provide a separate consumer config each time or can we handle this in the KafkaFlow library?

Steps to reproduce

      ConsumerConfig defaultConfig = new()
      {
          Acks = Confluent.Kafka.Acks.All,
          AllowAutoCreateTopics = false,
          AutoCommitIntervalMs = 5000,
          AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Latest,
          LogConnectionClose = false,
          ReconnectBackoffMs = 1000,
          ReconnectBackoffMaxMs = 6000
      };

builder.Services.AddKafkaFlowHostedService(c =>
{
    c.AddCluster(cluster =>
    {
        cluster.AddConsumer(consumerConfigurationBuilder =>
        {
            consumerConfigurationBuilder
               .Topic("OrderApprovedEvent")
               .WithGroupId("Consumer-group-1")
               .WithName(Guid.NewGuid().ToString())
               .WithBufferSize(1)
               .WithWorkersCount(1)
               .WithConsumerConfig(defaultConfig)
               .AddMiddlewares(consumerMiddlewareConfigurationBuilder =>
               {

                   consumerMiddlewareConfigurationBuilder
               .AddSingleTypeSerializer<OrderApprovedEvent, JsonCoreSerializer>(factory => new JsonCoreSerializer(DefaultJsonSerializerOptions.Instance))
               .AddTypedHandlers(typedHandlerConfigurationBuilder =>
               {
                   typedHandlerConfigurationBuilder.AddHandler<OrderApprovedConsumer>()
                                                           .WithHandlerLifetime(InstanceLifetime.Transient);
               });
               });

        });

        cluster.AddConsumer(consumerConfigurationBuilder =>
        {
            consumerConfigurationBuilder
               .Topic("OrderCreatedEvent")
               .WithGroupId("Consumer-group-2")
               .WithName(Guid.NewGuid().ToString())
               .WithBufferSize(1)
               .WithWorkersCount(1)
               .WithConsumerConfig(defaultConfig)
               .AddMiddlewares(consumerMiddlewareConfigurationBuilder =>
               {

                   consumerMiddlewareConfigurationBuilder
                   .AddSingleTypeSerializer<OrderApprovedEvent, JsonCoreSerializer>(factory => new JsonCoreSerializer(DefaultJsonSerializerOptions.Instance))
                   .AddTypedHandlers(typedHandlerConfigurationBuilder =>
                   {
                       typedHandlerConfigurationBuilder.AddHandler<OrderCreatedConsumer>()
                                                       .WithHandlerLifetime(InstanceLifetime.Transient);
                   });
               });

        });
    });
});

Expected behavior

Create each consumer in an isolated way

Actual behavior

using ConsumerConfigurationBuilder in a stateful way

KafkaFlow version

v.2.2.15...2.4.0