Custom middleware that throws an exception will cause handler starvation
Opened this issue · 1 comments
Adding custom middleware which can throw an exception causes unexpected behaviour, namely, handler starvation.
It seems the default pipeline has the following setup - source
public static HandlerMiddlewareBuilder UseDefaults<TMessage>(
this HandlerMiddlewareBuilder builder,
Type handlerType)
where TMessage : Message
{
if (builder == null) throw new ArgumentNullException(nameof(builder));
if (handlerType == null) throw new ArgumentNullException(nameof(handlerType), "HandlerType is used here to");
builder.UseMessageContextAccessor();
builder.Use<LoggingMiddleware>();
builder.UseStopwatch(handlerType);
builder.Use<SqsPostProcessorMiddleware>();
builder.UseErrorHandler();
builder.UseHandler<TMessage>();
return builder;
}
Specifically, the UseErrorHandler
middleware added here, converts exceptions into a false result from the handler - source
protected override async Task<bool> RunInnerAsync(HandleMessageContext context, Func<CancellationToken, Task<bool>> func, CancellationToken stoppingToken)
{
try
{
return await func(stoppingToken).ConfigureAwait(false);
}
catch (Exception e)
{
_monitor.HandleException(context.MessageType);
_monitor.HandleError(e, context.RawMessage);
context.SetException(e);
return false;
}
finally
{
_monitor.Handled(context.Message);
}
}
Now, consider a user decides to add some middleware to the outside of the pipeline:
public class BadMiddlewareTest : MiddlewareBase<HandleMessageContext, bool>
{
protected override async Task<bool> RunInnerAsync(HandleMessageContext context, Func<CancellationToken, Task<bool>> func, CancellationToken stoppingToken)
{
await func(stoppingToken).ConfigureAwait(false);
throw new Exception();
}
}
Now I configure this as such:
x.WithSubscriptionGroup("error_messages", cfg => cfg.WithConcurrencyLimit(1));
x.ForTopic<ErrorMessage>(
cfg =>
{
cfg.WithMiddlewareConfiguration(
m =>
{
m.Use<BadMiddlewareTest>();
m.UseDefaults<ErrorMessage>(typeof(ErrorMessageHandler));
});
cfg.WithReadConfiguration(
r =>
{
r.SubscriptionGroupName = "error_messages";
r.RetryCountBeforeSendingToErrorQueue = 1;
});
});
Now a simple handler:
public class ErrorMessageHandler : IHandlerAsync<ErrorMessage>
{
public static int Counter { get; private set; }
public async Task<bool> Handle(ErrorMessage message)
{
Counter++;
throw new Exception();
}
}
We can make a simple test, publish 10 messages, I expect the count to be 10, its actually 1:
[Test]
public async Task TestWeDontCrash()
{
var messagePublisher = Bootstrapper.Container.GetInstance<IMessagePublisher>();
await messagePublisher.PublishAsync(new ErrorMessage());
await messagePublisher.PublishAsync(new ErrorMessage());
await messagePublisher.PublishAsync(new ErrorMessage());
await messagePublisher.PublishAsync(new ErrorMessage());
await messagePublisher.PublishAsync(new ErrorMessage());
await messagePublisher.PublishAsync(new ErrorMessage());
await messagePublisher.PublishAsync(new ErrorMessage());
await messagePublisher.PublishAsync(new ErrorMessage());
await messagePublisher.PublishAsync(new ErrorMessage());
await messagePublisher.PublishAsync(new ErrorMessage());
await Task.Delay(5000);
ErrorMessageHandler.Counter.Should().Be(10);
}
Even if we wait forever, we only ever see a count of 1, however, if we remove the BadMiddlewareTest
, we receive the correct count of 10.
In addition, we can also move this middleware deeper in the chain, and the exception being caught will solve this behaviour:
x.ForTopic<ErrorMessage>(
cfg =>
{
cfg.WithMiddlewareConfiguration(
m =>
{
m.UseMessageContextAccessor();
m.Use<LoggingMiddleware>();
m.UseStopwatch(handlerType);
m.Use<SqsPostProcessorMiddleware>();
m.UseErrorHandler();
m.Use<BadMiddlewareTest>();
m.UseHandler<TMessage>();
});
cfg.WithReadConfiguration(
r =>
{
r.SubscriptionGroupName = "error_messages";
r.RetryCountBeforeSendingToErrorQueue = 1;
});
});
It has taken us a fair while to work out why we kept suffering from handlers stopping from processing their work and freezing up, and it turns out our UoW middleware we added could throw if it failed to write to the database.
I'm not sure if this is expected behaviour, but if it is, it must be documented clearly when writing middleware. It must not throw an exception under any circumstance unless its added after UseErrorHandler
which will handle the issue for you.
It should also be noted just for clarity that whatever you set
x.WithSubscriptionGroup("error_messages", cfg => cfg.WithConcurrencyLimit(1));
to in my original test will be the result of the counter, i.e. each handler will run once and die after the exception isn't caught in the middleware