martinothamar/Mediator

Run NotificationHandlers in parallel

Neo-vortex opened this issue ยท 25 comments

One of the philosophy of notification is to notify different handlers at the same time (hopefully).
currently what we do is :

 for (int i = 0; i < handlers.Length; i++)
                {
                    try
                    {
                        await handlers[i].Handle(notification, cancellationToken).ConfigureAwait(false);
                    }
                    catch (global::System.Exception ex)
                    {
                        exceptions ??= new global::System.Collections.Generic.List<global::System.Exception>();
                        exceptions.Add(ex);
                    }
                }

MediatR 12 has implemented an option to define if Notifications handlers should run parallel or not.
here is a link to a video demonstrate it : https://www.youtube.com/watch?v=hNxVjNO6RX4
Mediator library should add an option for these execution strategies.

imagine these two notification handlers :

public sealed class GenericNotificationHandler<TNotification> : INotificationHandler<TNotification>
    where TNotification : INotification
{
    public async ValueTask Handle(TNotification notification, CancellationToken cancellationToken)
    {
               // very long operation
                await Task.Delay(10000, cancellationToken);
    }
}
public sealed class GenericNotificationHandler2<TNotification> : INotificationHandler<TNotification>
    where TNotification : INotification
{
    public async ValueTask Handle(TNotification notification, CancellationToken cancellationToken)
    {
        await Task.Delay(2000, cancellationToken);
    }
}

with current implementation the handling time for all the handlers would be about ~ 12000 ms
but with the new implementation it would be about ~ 10000 ms

Actually, the code you mentioned in this repository is already running tasks in parallel. Although the for loop could make you think it's running handlers sequentially, it is not.
More on that through this SO answer - from which the code is probably taken from.

Actually, the code you mentioned in this repository is already running tasks in parallel. Although the for loop could make you think it's running handlers sequentially, it is not. More on that through this SO answer - from which the code is probably taken from.

The difference between your link and this, is that the SO method takes an array of ValueTask. These would have been already started (in a hot state) when ValueTask<T[]> WhenAll<T> is called, so calling await is fine there.
In our case the ValueTasks haven't started yet, and are currently being awaited immediately after creation, resulting in sequential execution.

Sorry if I'm wrong, I'm unable to do a benchmark rn ๐Ÿ™

Hmm that's very correct! I've overseen it too fast it seems!

There is no ValueTask<T[]> WhenAll<T>, so the ValueTask would have to be boxed with AsTask like in the PR and we would pay more in terms of allocation. That said, as long as it's configurable/opt-in then I think it's OK. One of the key goals of this library is to keep overhead low (close to zero). Ideally, only the users that require parallelism in notification handlers should pay the cost for it, which is how MediatR landed on its current API I'm assuming.

There's a little more nuance required to decide wether something actually runs parallel or not though. For example this code:

var tasks = new Task[4];
for (int i = 0; i < 4; i++)
    tasks[i] = Work(i);

await Task.WhenAll(tasks);

async Task Work(int i)
{
    const int SIZE = 100_000_000;
    var n = new float[SIZE];
    var rng = Random.Shared;
    for (int j = 0; j < SIZE; j++)
    {
        n[j] = rng.NextSingle();
    }

    Console.WriteLine($"{i} done!");
}

Does not actually run parallel, becaues Work never actually yields the thread. So in this case "N done!" would be printed sequentially. If we insert Task.Yield in the beginning of the function then it will yield the thread and each task will (presumably) be scheduled to different threads by the runtime. Another way to ensure parallelism would be to wrap Work(i) in Task.Run or similar.
The curent TaskWhenAllPublisher in MediatR does not use something ilke Task.Run AFAICT, and therefore handlers might run sequential: https://github.com/jbogard/MediatR/blob/c295291c4e8105d11a453004b42609cbf490c1cf/src/MediatR/NotificationPublishers/TaskWhenAllPublisher.cs

@martinothamar

API Proposal: Optional Parallel Notification Execution

Author: @Neo-vortex

Overview

This API proposal outlines the implementation of an optional parallel notification execution feature within our service. This feature allows us to handle notifications concurrently, enhancing the system's performance.

Implementation

We will introduce a new configuration option for our Mediator setup, as shown in the code snippet below:

builder.Services.AddMediator(options =>
{
    options.ServiceLifetime = ServiceLifetime.Transient;
    // NotificationHandlingStrategy can be configured here
    options.NotificationHandlingStrategy = NotificationHandlingStrategy.PARALLEL;
});

we introduce a NotificationHandlingStrategy to specify how notifications are processed. The NotificationHandlingStrategy is an enumeration with two possible values: PARALLEL and SEQUENTIAL.
default value of options.NotificationHandlingStrategy would be NotificationHandlingStrategy.SEQUENTIAL to maintain the old behavior.

NotificationHandlingStrategy Enumeration

We define the NotificationHandlingStrategy enumeration in this section:

public enum NotificationHandlingStrategy {
    PARALLEL,   // Notifications are executed concurrently
    SEQUENTIAL  // Notifications are executed one after another
}

This enumeration allows users to choose between parallel and sequential notification processing based on their specific requirements.

NotificationHandlingStrategy.PARALLEL

I think a more appropiate name would be Concurrent execution right?

Task.WhenAll alternative

There is no ValueTask<T[]> WhenAll, so the ValueTask would have to be boxed with AsTask like in the PR and we would pay more in terms of allocation.

I don't think its necessary to use Task.WhenAll, we could write an awaiter like so:

global::System.Collections.Generic.List<global::System.Exception>? exceptions = null;
// start each task
ValueTask[] valueTasks = handlers
    .Select(handler => handler.Handle(notification, cancellationToken).ConfigureAwait(false)).ToArray();
foreach(var task in valueTasks)
{
    try
    {
        // we await each task, it doesn't matter if we await the slowest one as we'd have to wait for it anyway
        // this way all the other tasks will have completed.
        await task;
    }
    catch (global::System.Exception ex)
    {
        exceptions ??= new global::System.Collections.Generic.List<global::System.Exception>();
        exceptions.Add(ex);
    }
}

Would this be an appropiate place to use IsCompletedSuccessfully? I can't remember the downsides or benefits of this ๐Ÿ˜„, the blog post didn't elaborate much. I'll benchmark/play around with it later.

try
{
    if (!task.IsCompletedSuccessfully)
    {
        await task;
    }
}

Performance

One of the key goals of this library is to keep overhead low (close to zero)

My suggested implementation causes an array allocation ๐Ÿ˜•, this could be avoided by using the .NET 8 inline array feature. Mediator could add Buffer2<T>, Buffer3<T> types up to a max number and use the appropiate collection depending on the handler length (IIRC Mediator knows the number of handlers for each notification AOT, we generate a buffer type when it is needed). A Publish method would have to be created for each buffer length due to the lack of convenient shared type.

I suspect most projects only use a small number of notification handlers so this would be used in most cases, otherwise an array will be allocated (could look at ThreadStaticLocal as well).

This is probably redundant as if any of the handlers yield, the ValueTask would have to allocate a Task to the heap, moving the inline array to the heap.

This seems to work as well.
But i do not understand why you think
'. AsTask()' would have more overhead (be it allocation or anything)
'As' keyword usually means little to no overhrad when converting the source to the target type

ValueTask is a struct capable of wrapping either a TResult or a Task<TResult>. Meaning it can be returned from an async method, and if that method completes synchronously and successfully, nothing need be allocated. Only if the method completes asynchronously does a Task<TResult> need to be allocated.

Calling AsTask returns the Task<TResult> object that is wrapped in this ValueTask<TResult> if one exists, or a new Task<TResult> object that represents the result. Meaning that if a given ValueTask would run synchronously, calling AsTask will cause an unnecessary allocation of Task.

Benchmarks

Benchmarks clearly show that AsTask always causes a task to be allocated, even for methods which do not await. Observe how all methods allocate for AsyncAwaitTask. This is because awaiting Task.Delay makes the method complete asynchronously, requiring a Task to be allocated to the heap,

It looks like IsCompletedSuccessfullyResult is faster than awaiting a synchronous method, perhaps this should be used?

ResultTask

Method Mean Error StdDev Gen0 Allocated
AwaitResult 33.13 ns 0.707 ns 0.757 ns - -
AsTaskResult 32.72 ns 0.451 ns 0.352 ns 0.0459 72 B
IsCompletedSuccessfullyResult 22.86 ns 0.343 ns 0.304 ns - -

AsyncTask

Method Mean Error StdDev Gen0 Allocated
AwaitResult 46.34 ns 0.970 ns 1.328 ns - -
AsTaskResult 52.49 ns 0.336 ns 0.280 ns 0.0459 72 B
IsCompletedSuccessfullyResult 39.43 ns 0.817 ns 1.146 ns - -

AsyncAwaitTask

I awaited Task.Yield(), hence why it is so much slower. It still demonstrates that awaiting causes all variations to allocate a Task.

Method Mean Error StdDev Gen0 Allocated
AwaitResult 1.108 us 0.0120 us 0.0113 us 0.1564 248 B
AsTaskResult 1.108 us 0.0161 us 0.0150 us 0.1526 240 B
IsCompletedSuccessfullyResult 1.131 us 0.0116 us 0.0108 us 0.1564 248 B

Code

Click me
using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;

BenchmarkRunner.Run<Benchmarks>();

[MemoryDiagnoser()]
public class Benchmarks
{
    [Benchmark()]
    public async ValueTask<int> AwaitResult()
    {
        return await AsyncTask();
    }
    
    [Benchmark()]
    public async ValueTask<int> AsTaskResult()
    {
        return await AsyncTask().AsTask();
    }
    
    [Benchmark()]
    public async ValueTask<int> IsCompletedSuccessfullyResult()
    {
        var task = AsyncTask();
        if (task.IsCompletedSuccessfully)
        {
            return task.Result;
        }

        return await task;
    }

    private static ValueTask<int> ResultTask()
    {
        return ValueTask.FromResult(10);
    }
    
    private static async ValueTask<int> AsyncTask()
    {
        return 10;
    }
    
    private static async ValueTask<int> AsyncAwaitTask()
    {
        await Task.Yield();
        return 10;
    }
}

Hey, sorry for the late reply. I'll be trying to catch up this week :)

I think a more appropiate name would be Concurrent execution right?

I agree! Also the fact that you used PascalCase, I think that's the idiomatic way for enums in .NET. Apart from this I think the API and naming is great @Neo-vortex ๐Ÿ‘

I'd be interested to see more benchmarks running notification publishing workloads, testing various optimisations. Optimizations such as being opportunistic about IsCompletedSuccessfully might drown in the noise of the rest of the code since the function is already async, I've mostly seen that optimization being useful when you can avoid the async state machine altogether.

Maybe we should agree on what the common case is. I would guess that you are right @TimothyMakkison in that there usually is a single or few handlers. I also think that most handlers will be async. Do you agree?

I'd be interested to see more benchmarks running notification publishing workloads, testing various optimisations. Optimizations such as being opportunistic about IsCompletedSuccessfully might drown in the noise of the rest of the code since the function is already async, I've mostly seen that optimization being useful when you can avoid the async state machine altogether.

I agree, IsCompletedSuccessfully is best used to avoid allocations.
I did run some benchmarks to experiment with different methods, I tried to use small amounts of handlers and varied between all async, sync and mixed handlers. I can't find the benchmark code rn but this is what I found:

  • Task.WhenAll(Task[]) will copy the array doubling duplications
  • Task.WhenAll(IEnumerable<Task>) internally uses ToArray so won't duplicate an array. Unfortunately var tasks = array.Select(static x => x()).Where(static x => !x.IsCompletedSuccessfully); will allocate a linq state machine of (iirc) 112 bytes, making any saving redundant.
  • I think AsTask will allocate state machines for async methods, using AsTask on ValueTask.CompletedTask won't allocate.

I found a variant on the following worked quite well, it could run synchronously with no allocations if all the handlers were synchronous. If an async handler was called an array large enough to fit all the remaining handlers would be allocated. (List has a large overhead)

Task?[]? tasks = null;
var pos = 0;

for (var i = 0; i < array.Length; i++)
{
    var task = array[i]();
    if(task.IsCompletedSuccessfully)
        continue;

    tasks ??= new Task[array.Length - i];
    tasks[pos] = task.AsTask();
    pos++;
}

if(tasks == null)
    return;

foreach (var t in tasks)
{
    if (t == null)
        break;

    await t.ConfigureAwait(false);
}

Maybe we should agree on what the common case is. I would guess that you are right @TimothyMakkison in that there usually is a single or few handlers. I also think that most handlers will be async. Do you agree?

Not sure how it's used in the wild, but I suspect 95% of uses have one or two async handlers. I wonder if anyone has ever had more than 10 handlers for one notification ๐Ÿค”

That code looks great ๐Ÿ‘

I was wondering if awaiting tasks in a loop causes more "scheduling churn"/CPU overhead than using Task.WhenAll, but from my interpretation of the code in WhenAllPromise it seems unlikely to be cheaper in any scenario, as there is quite a lot of bookeeping code and atomic operations in there

Task.WhenAll(Task[]) will copy the array doubling duplications

Doesn't look like that to me? If passed as array its just casted to a span. The array can't contain nulls though, so any remaining length would have to be populated by Task.CompletedTask

Doesn't look like that to me? If passed as array its just casted to a span. The array can't contain nulls though, so any remaining length would have to be populated by Task.CompletedTask

Nice find, looks like they removed the duplication in .NET 8. I get the following in .NET 7 and below

Task[] tasksCopy = new Task[taskCount];
for (int i = 0; i < taskCount; i++)
{
    Task task = tasks[i];
    if (task == null) ThrowHelper.ThrowArgumentException(ExceptionResource.Task_MultiTaskContinuation_NullTask, ExceptionArgument.tasks);
    tasksCopy[i] = task;
}

The array can't contain nulls though, so any remaining length would have to be populated by Task.CompletedTask

mb this version didn't need Task?:

Task[]? tasks = null;

for (var i = 0; i < array.Length; i++)
{
    var task = array[i]();
    if(tasks == null && task.IsCompletedSuccessfully)
        continue;

    tasks ??= new Task[array.Length - i];
    tasks[i - (array.Length - tasks.Length)] = task.AsTask();
}

if(tasks == null)
    return;

foreach (var t in tasks)
{
    await t.ConfigureAwait(false);
}

Is it better to use Parallel.ForEachAsync then task waitall, it's already optimized to deal with cpu cores, also checking if number of handler are not to low would be good idea, if you have single handler there is not point running it in parallel, waisting reasources

Is it better to use Parallel.ForEachAsync then task waitall, it's already optimized to deal with cpu cores

It varies from tasks to task but Parallel.ForEachAsync uses more memory, alaways allocates a Task (I Think) and has the overhead of managing multiple threads. I suspect it is overkill for 99% of scenarios but that's only my gut feeling ๐Ÿ˜„

It might be worth creating NotificationHandlingStrategy.Parallel, NotificationHandlingStrategy.Sequential and NotificationHandlingStrategy.Concurrent. It might be possible to let the user define their own strategies using IServiceCollection.

Benchmark for 3 async Tasks

Method Mean Error StdDev Allocated
Concurrent 15.73 ms 0.101 ms 0.095 ms 1.06 KB
ParallelForEach 15.74 ms 0.087 ms 0.081 ms 1.98 KB

Code

Feel free to play around with the number of threads/ the delay.

using BenchmarkDotNet.Attributes;
using BenchmarkDotNet.Running;

BenchmarkRunner.Run<Benchmark>();

[MemoryDiagnoser]
public class Benchmark
{
    private Func<Task>[] _tasks = Enumerable.Range(0,3).Select<int, Func<Task>>(x => async () => { await Task.Delay(1); }).ToArray();

    [Benchmark]
    public async Task Concurrent()
    {
        var tasks = _tasks.Select(x => x()).ToArray();

        foreach (var task in tasks)
        {
            await task;
        }
    }
    
    [Benchmark]
    public async Task ParallelForEach()
    {
        await Parallel.ForEachAsync(_tasks, async (x, _) => await x(); });
    }
}

Yeah, for small number of task maybe it's not worthy, if you put 100 or more we could probably see better result depending on work done in tasks (IO, CPU) but not sure if someone is using 100 handlers for for single type of message ๐Ÿ™‚

agree with point let user define it's own execution strategy implementation, i think MediatR has that option

I kind of like the configuration API from MediatR for this, just having the user specify a type. That will allow users to hook into more stuff in the notification pipeline as well (for example distributed tracing or any custom instrumentation)

https://github.com/jbogard/MediatR/wiki#custom-notification-publishers

I agree, specifying a type for a custom INotificationPublisher has worked well for me in MediatR. Some examples would include using it to do something like MediatR's TaskWhenAllPublisher, using Parallel.ForEachAsync() (which does return a ValueTask for a delegate), having a sequential foreach loop, or fire-and-forget. I use the last case in an app to make notifications fire-and-forget.

Started work on this here: #145
Would appreciate reviews!

I did a bunch of experimentation trying to figure out the most lightweight way of awaiting an array of value tasks that complete asynchronously: https://gist.github.com/martinothamar/ed6c4bec532cbc9ca9311d15a382a3a6

Can you spot any errors or weaknesses in the code? Essentially my interpretation is that the Loop variants do pretty much on par latency-wise, but has less allocations. So in a high throughput/concurrent situation (unlike these microbenchmarks), I think those should do well

Here's the current impl in my PR:

ValueTask[]? tasks = null;
var count = 0;
foreach (var handler in handlersArray)
{
var task = handler.Handle(notification, cancellationToken);
if (task.IsCompletedSuccessfully)
continue;
tasks ??= new ValueTask[handlersArray.Length];
tasks[count++] = task;
}
if (tasks is null)
return default;
return AwaitTaskArray(tasks, count);

I might try to use the PoolingAsyncValueTaskMethodBuilder here too, but that might not work as well outside of microbenchmarks like these..

I forgot to address this but I incorrectly responded to @Neo-vortex s comment with my reply.

I do not understand why you think AsTask() would have more overhead (be it allocation or anything).

My response only looked at generic ValueTask<TResult> and neglected the case for the relevant non-generic ValueTask. In this case he is spot on and I completely ignored his point ๐Ÿ˜…

Calling AsTask on a synchronous ValueTask will return a static Task.CompletedTask using zero memory. Calling AsTask on a asynchronous ValueTask will return the underlying allocated Task object, this allocation would have occured even if the ValueTask weren't awaited or if AsTask was called.

Feeling pretty good about #145 now, also added a sample and some docs, would appreciate reviews as always. The PR has gotten pretty large but a big part of it is just snapshot-files changing. I'll merge tomorrow probably unless there are any big concerns

Changes published in 3.0.0-preview.25

Please still feel free to test and provide feedback

This is very exciting! I'll put it to the test. Thank you, @martinothamar! ๐Ÿ™Œ

Good to hear! Some people are reporting issues. I'm actively looking into it so feedback and reports are great ๐Ÿ˜„ #146