[Bug] Requeue error with DeferMessageAction using MessagingGateway.MsSql
rene-mandel opened this issue · 4 comments
Describe the bug
System.ArgumentException: An item with the same key has already been added. Key: deliveryTag
is been thrown when you use DeferMessageAction when using MessagingGateway.MsSql
To Reproduce
Set MsSqlSubscription.RequeueCount
value greater than 2.
Exceptions (if any)
at System.Collections.Generic.Dictionary`2.TryInsert(TKey key, TValue value, InsertionBehavior behavior) at System.Collections.Generic.Dictionary`2.Add(TKey key, TValue value) at Paramore.Brighter.Serialization.DictionaryStringObjectJsonConverter.Read(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options) in /_/src/Paramore.Brighter/Serialization/DictionaryStringObjectJsonConverter.cs:line 44 at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value, Boolean& isPopulatedValue) at System.Text.Json.Serialization.Metadata.JsonPropertyInfo`1.ReadJsonAndSetMember(Object obj, ReadStack& state, Utf8JsonReader& reader) at System.Text.Json.Serialization.Converters.ObjectDefaultConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value) at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value, Boolean& isPopulatedValue) at System.Text.Json.Serialization.Converters.SmallObjectWithParameterizedConstructorConverter`5.TryRead[TArg](ReadStack& state, Utf8JsonReader& reader, JsonParameterInfo jsonParameterInfo, TArg& arg) at System.Text.Json.Serialization.Converters.SmallObjectWithParameterizedConstructorConverter`5.ReadAndCacheConstructorArgument(ReadStack& state, Utf8JsonReader& reader, JsonParameterInfo jsonParameterInfo) at System.Text.Json.Serialization.Converters.ObjectWithParameterizedConstructorConverter`1.OnTryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value) at System.Text.Json.Serialization.JsonConverter`1.TryRead(Utf8JsonReader& reader, Type typeToConvert, JsonSerializerOptions options, ReadStack& state, T& value, Boolean& isPopulatedValue) at System.Text.Json.Serialization.JsonConverter`1.ReadCore(Utf8JsonReader& reader, JsonSerializerOptions options, ReadStack& state) at System.Text.Json.JsonSerializer.ReadFromSpan[TValue](ReadOnlySpan`1 utf8Json, JsonTypeInfo`1 jsonTypeInfo, Nullable`1 actualByteCount) at System.Text.Json.JsonSerializer.ReadFromSpan[TValue](ReadOnlySpan`1 json, JsonTypeInfo`1 jsonTypeInfo) at System.Text.Json.JsonSerializer.Deserialize[TValue](String json, JsonSerializerOptions options) at Paramore.Brighter.MessagingGateway.MsSql.SqlQueues.MsSqlMessageQueue`1.TryReceive(String topic) in /_/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/MsSqlMessageQueue.cs:line 133 at Paramore.Brighter.MessagingGateway.MsSql.SqlQueues.MsSqlMessageQueue`1.TryReceive(String topic, Int32 timeoutInMilliseconds) in /_/src/Paramore.Brighter.MessagingGateway.MsSql/SqlQueues/MsSqlMessageQueue.cs:line 100 at Paramore.Brighter.MessagingGateway.MsSql.MsSqlMessageConsumer.Receive(Int32 timeoutInMilliseconds) in /_/src/Paramore.Brighter.MessagingGateway.MsSql/MsSqlMessageConsumer.cs:line 39 at Paramore.Brighter.Channel.Receive(Int32 timeoutinMilliseconds) in /_/src/Paramore.Brighter/Channel.cs:line 120 at Paramore.Brighter.ServiceActivator.MessagePump`1.Run() in /_/src/Paramore.Brighter.ServiceActivator/MessagePump.cs:line 106
Further technical details
- Brighter version: 9.7.6
- I looked into issues and found that it might be similar problem to following issue: #2698
@rene-mandel Can we just check your use case for MS-SQL as a message provider? We are looking into if the main use case is local development as we might recommend RMQ for testing if you can't access your production provider.
I'll provide you with details on how this is currently set up. I hope this helps.
private static IServiceCollection AddMessagingSystem(this IServiceCollection services, IConfiguration configuration)
{
services.AddSingleton<IMessagingSystem, MessagingSystem>();
MessagingSettings messagingSettings = new();
configuration.Bind(MessagingSettings.Section, messagingSettings);
var messagingConfiguration = new MsSqlConfiguration(connectionString: messagingSettings.SqlConnectionString, queueStoreTable: "QueueData");
AddMessageSender(services, messagingConfiguration);
// Receiver is a hosted service
AddMessageReceiver(services, messagingConfiguration);
return services;
}
private static void AddMessageSender(IServiceCollection services, MsSqlConfiguration messagingConfiguration)
{
var publication = new Publication
{
MakeChannels = OnMissingChannel.Create,
Topic = new RoutingKey($"{nameof(DemoMessage)}.topic"),
};
var producerRegistry = new MsSqlProducerRegistryFactory(
messagingConfiguration,
[publication]
)
.Create();
services.AddBrighter(options =>
{
options.HandlerLifetime = ServiceLifetime.Scoped;
})
.UseExternalBus(producerRegistry)
.AutoFromAssemblies();
}
private static void AddMessageReceiver(IServiceCollection services, MsSqlConfiguration messagingConfiguration)
{
var subscriptions = new MsSqlSubscription[]
{
new MsSqlSubscription<DemoMessage>(
new SubscriptionName($"{nameof(DemoMessage)}.subscription"),
new ChannelName($"{nameof(DemoMessage)}.topic"),
new RoutingKey($"{nameof(DemoMessage)}.topic"),
runAsync: true,
requeueCount: 5, // Here is the problematic line, if greater than 2, the exception will occur
requeueDelayInMilliseconds: 5000
)
};
var messageConsumerFactory = new MsSqlMessageConsumerFactory(messagingConfiguration);
services.AddServiceActivator(options =>
{
options.Subscriptions = subscriptions;
options.ChannelFactory = new ChannelFactory(messageConsumerFactory);
options.HandlerLifetime = ServiceLifetime.Scoped;
options.UseScoped = true;
})
.AutoFromAssemblies();
services.AddHostedService<ServiceActivatorHostedService>();
}
@rene-mandel Is your use case that you are using MSSQL as your primary message broker, not as a local alternative to a cloud service. It would help us to understand usage of this feature.
Thanks for the code though, it should help
@iancooper Yes, MSSQL is used as primary message broker. This usage is through all environments.