[Bug Report]: Kafkaflow does not allow to have consumers with different consumer groups (shared base consumer config)
ArminShoeibi opened this issue · 0 comments
ArminShoeibi commented
Prerequisites
- I have searched issues to ensure it has not already been reported
Description
Hi there folks,
We have 3 consumers in our project and we want these consumers to have different consumer groups we are using a base consumer config like this:
ConsumerConfig defaultConfig = new()
{
Acks = Confluent.Kafka.Acks.All,
AllowAutoCreateTopics = false,
AutoCommitIntervalMs = 5000,
AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Latest,
LogConnectionClose = false,
ReconnectBackoffMs = 1000,
ReconnectBackoffMaxMs = 6000
};
then we pass this instance to the below method.
then the problem appears (related to reference types) in the Build method
our second and third consumers, use the first consumer config.
Should we provide a separate consumer config each time or can we handle this in the KafkaFlow library?
Steps to reproduce
ConsumerConfig defaultConfig = new()
{
Acks = Confluent.Kafka.Acks.All,
AllowAutoCreateTopics = false,
AutoCommitIntervalMs = 5000,
AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Latest,
LogConnectionClose = false,
ReconnectBackoffMs = 1000,
ReconnectBackoffMaxMs = 6000
};
builder.Services.AddKafkaFlowHostedService(c =>
{
c.AddCluster(cluster =>
{
cluster.AddConsumer(consumerConfigurationBuilder =>
{
consumerConfigurationBuilder
.Topic("OrderApprovedEvent")
.WithGroupId("Consumer-group-1")
.WithName(Guid.NewGuid().ToString())
.WithBufferSize(1)
.WithWorkersCount(1)
.WithConsumerConfig(defaultConfig)
.AddMiddlewares(consumerMiddlewareConfigurationBuilder =>
{
consumerMiddlewareConfigurationBuilder
.AddSingleTypeSerializer<OrderApprovedEvent, JsonCoreSerializer>(factory => new JsonCoreSerializer(DefaultJsonSerializerOptions.Instance))
.AddTypedHandlers(typedHandlerConfigurationBuilder =>
{
typedHandlerConfigurationBuilder.AddHandler<OrderApprovedConsumer>()
.WithHandlerLifetime(InstanceLifetime.Transient);
});
});
});
cluster.AddConsumer(consumerConfigurationBuilder =>
{
consumerConfigurationBuilder
.Topic("OrderCreatedEvent")
.WithGroupId("Consumer-group-2")
.WithName(Guid.NewGuid().ToString())
.WithBufferSize(1)
.WithWorkersCount(1)
.WithConsumerConfig(defaultConfig)
.AddMiddlewares(consumerMiddlewareConfigurationBuilder =>
{
consumerMiddlewareConfigurationBuilder
.AddSingleTypeSerializer<OrderApprovedEvent, JsonCoreSerializer>(factory => new JsonCoreSerializer(DefaultJsonSerializerOptions.Instance))
.AddTypedHandlers(typedHandlerConfigurationBuilder =>
{
typedHandlerConfigurationBuilder.AddHandler<OrderCreatedConsumer>()
.WithHandlerLifetime(InstanceLifetime.Transient);
});
});
});
});
});
Expected behavior
Create each consumer in an isolated way
Actual behavior
using ConsumerConfigurationBuilder in a stateful way
KafkaFlow version
v.2.2.15...2.4.0