BrighterCommand/Brighter

[Bug] Requeue error with DeferMessageAction using MessagingGateway.MsSql

rene-mandel opened this issue · 4 comments

Describe the bug

System.ArgumentException: An item with the same key has already been added. Key: deliveryTag is been thrown when you use DeferMessageAction when using MessagingGateway.MsSql

To Reproduce

Set MsSqlSubscription.RequeueCount value greater than 2.

Exceptions (if any)

at System.Collections.Generic.Dictionary`2.TryInsert(TKey key, TValue value, InsertionBehavior behavior) at System.Collections.Generic.Dictionary`2.Add(TKey key, TValue value) at Paramore.Brighter.Serialization.DictionaryStringObjectJsonConverter.Read(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options) in /_/src/Paramore.Brighter/Serialization/DictionaryStringObjectJsonConverter.cs:line 44 at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value, Boolean& isPopulatedValue) at System.Text.Json.Serialization.Metadata.JsonPropertyInfo`1.ReadJsonAndSetMember(Object obj, ReadStack& state, Utf8JsonReader& reader) at System.Text.Json.Serialization.Converters.ObjectDefaultConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value) at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value, Boolean& isPopulatedValue) at System.Text.Json.Serialization.Converters.SmallObjectWithParameterizedConstructorConverter`5.TryRead[TArg](ReadStack& state, Utf8JsonReader& reader, JsonParameterInfo jsonParameterInfo, TArg& arg) at System.Text.Json.Serialization.Converters.SmallObjectWithParameterizedConstructorConverter`5.ReadAndCacheConstructorArgument(ReadStack& state, Utf8JsonReader& reader, JsonParameterInfo jsonParameterInfo) at System.Text.Json.Serialization.Converters.ObjectWithParameterizedConstructorConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value) at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value, Boolean& isPopulatedValue) at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state) at System.Text.Json.JsonSerializer.ReadFromSpan[TValue](ReadOnlySpan`1 utf8Json, JsonTypeInfo`1 jsonTypeInfo, Nullable`1 actualByteCount) at System.Text.Json.JsonSerializer.ReadFromSpan[TValue](ReadOnlySpan`1 json, JsonTypeInfo`1 jsonTypeInfo) at System.Text.Json.JsonSerializer.Deserialize[TValue](String json, JsonSerializerOptions options) at Paramore.Brighter.MessagingGateway.MsSql.SqlQueues.MsSqlMessageQueue`1.TryReceive(String topic) in /_/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/MsSqlMessageQueue.cs:line 133 at Paramore.Brighter.MessagingGateway.MsSql.SqlQueues.MsSqlMessageQueue`1.TryReceive(String topic, Int32 timeoutInMilliseconds) in /_/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/MsSqlMessageQueue.cs:line 100 at Paramore.Brighter.MessagingGateway.MsSql.MsSqlMessageConsumer.Receive(Int32 timeoutInMilliseconds) in /_/src/Paramore.Brighter.MessagingGateway.MsSql/MsSqlMessageConsumer.cs:line 39 at Paramore.Brighter.Channel.Receive(Int32 timeoutinMilliseconds) in /_/src/Paramore.Brighter/Channel.cs:line 120 at Paramore.Brighter.ServiceActivator.MessagePump`1.Run() in /_/src/Paramore.Brighter.ServiceActivator/MessagePump.cs:line 106

Further technical details

  • Brighter version: 9.7.6
  • I looked into issues and found that it might be similar problem to following issue: #2698

@rene-mandel Can we just check your use case for MS-SQL as a message provider? We are looking into if the main use case is local development as we might recommend RMQ for testing if you can't access your production provider.

@iancooper

I'll provide you with details on how this is currently set up. I hope this helps.

private static IServiceCollection AddMessagingSystem(this IServiceCollection services, IConfiguration configuration)
{
    services.AddSingleton<IMessagingSystem, MessagingSystem>();

    MessagingSettings messagingSettings = new();
    configuration.Bind(MessagingSettings.Section, messagingSettings);
    var messagingConfiguration = new MsSqlConfiguration(connectionString: messagingSettings.SqlConnectionString, queueStoreTable: "QueueData");

    AddMessageSender(services, messagingConfiguration);
    // Receiver is a hosted service
    AddMessageReceiver(services, messagingConfiguration);

    return services;
}

private static void AddMessageSender(IServiceCollection services, MsSqlConfiguration messagingConfiguration)
{
    var publication = new Publication
    {
        MakeChannels = OnMissingChannel.Create,
        Topic = new RoutingKey($"{nameof(DemoMessage)}.topic"),
    };

    var producerRegistry = new MsSqlProducerRegistryFactory(
            messagingConfiguration,
            [publication]
        )
        .Create();

    services.AddBrighter(options =>
    {
        options.HandlerLifetime = ServiceLifetime.Scoped;
    })
    .UseExternalBus(producerRegistry)
    .AutoFromAssemblies();
}

private static void AddMessageReceiver(IServiceCollection services, MsSqlConfiguration messagingConfiguration)
{
    var subscriptions = new MsSqlSubscription[]
    {
        new MsSqlSubscription<DemoMessage>(
            new SubscriptionName($"{nameof(DemoMessage)}.subscription"),
            new ChannelName($"{nameof(DemoMessage)}.topic"),
            new RoutingKey($"{nameof(DemoMessage)}.topic"),
            runAsync: true,
            requeueCount: 5, // Here is the problematic line, if greater than 2, the exception will occur
            requeueDelayInMilliseconds: 5000
        )
    };

    var messageConsumerFactory = new MsSqlMessageConsumerFactory(messagingConfiguration);

    services.AddServiceActivator(options =>
    {
        options.Subscriptions = subscriptions;
        options.ChannelFactory = new ChannelFactory(messageConsumerFactory);
        options.HandlerLifetime = ServiceLifetime.Scoped;
        options.UseScoped = true;
    })
    .AutoFromAssemblies();

    services.AddHostedService<ServiceActivatorHostedService>();
}

@rene-mandel Is your use case that you are using MSSQL as your primary message broker, not as a local alternative to a cloud service. It would help us to understand usage of this feature.

Thanks for the code though, it should help

@iancooper Yes, MSSQL is used as primary message broker. This usage is through all environments.