disruptor-net/Disruptor-net

Task-based handlers

ocoanet opened this issue · 4 comments

Some Disruptor-net users would like to use task-based API in their event handlers. The goal of this issue is to discuss whether supporting task-based handlers in the Disruptor is a good idea, and what the design of the API could be.

Async issues

The Disruptor has a "no-allocation" policy that make it quite hard to support async handlers. However, the goal of the "no-allocation" policy is to allow user applications to run in a "no-allocation" mode. If an application does not need the "no-allocation" mode, then it might be ok to allocate in the Disruptor. Of course, this behavior should be documented in the related types.

In addition, the idea of async handlers can be quite strange because:

  • Events are processed in order, so handler H1 should not start processing event E2 until E1 is processed, and handler H2 should not start processing event E1 before E1 is processed on H1.
  • Handlers use dedicated threads, so awaiting a task is not going to save any resource, and waiting is a perfectly fine approach.

Yet, I can understand that it feels better to write idiomatic async / await code than to wait, especially because waiting is an anti-pattern in other frameworks.

Async API

Here is a possible API for async handlers:

public interface IAsyncBatchEventHandler<T> where T : class
{
    ValueTask OnBatch(List<T> data, long sequence);
}

I see multiple options:

  • (A) The next event handler is allowed to start processing the current event before the current event task is completed.
  • (B) The event handler is allowed to start processing the next event before the current event task is completed.
  • (C) The continuations run in the event handler thread (i.e.: there is a custom synchronization context).

In a full "sequential" mode (A and B disabled, C enabled), the behavior would be identical to waiting for tasks in the event handler, but the code would be slightly cleaner and slower (because of the overhead of the task machinery).

In a full "async" mode (A and B enabled, C disabled), the behavior would be identical to fire-and-forgetting the tasks. I do not see any value in this mode, except for automatically sending the errors to an IExceptionHandler. This mode would be dangerous because it would hide queueing and prevent backpressure.

Other modes:

  1. Only waiting before processing events in the next handler (B enabled, A disabled). It can already be simply implemented by saving the Task inside the event and to wait for the task in the next handler.
  2. Only waiting before processing the next event in the current handler (A enabled, B disabled). It can already be simply implemented by saving the Task inside the handler and to wait for the task before processing the next event.

Right now, I do not see a lot of value in those options. I almost feel like a good documentation with all the implementation options would be more interesting than the API.

Please respond to this issue if you are interested in async handlers or if you think that I missed an option or an argument.

The full "async" mode is problematic as the user could be lead down the path of writing code that resumes on a different thread but mutates the state of the event handler. Also, as you noted this can break dependencies between handlers if we say that the next event handler can process the the current event. This will be apparent if the current event handler modifies the contents of the entry after completing it's async work.

I also agree that with your observation that it would be dangerous as it hides queueing. In my opinion, not being an unbounded queue is one of the major design goals of the Disruptor.

I do not think that the cleaner API of a full "sequential" mode warrants the additional complexity that would be added to the project. Is this one more feature in an already rich API that we want to support?

Another option is to embrace async / await and to stop using dedicated threads for these particular event handlers. Of course it would be only useful if the performance is still significantly better than alternative options (for example Channels). But even with slightly better performance, async handler could be interesting as non-critical, end-of-the-pipeline handler.

The challenge would be to make wait strategies work properly here, because right now all wait strategies are blocking the current thread, and it would be a bad design to use them in thread-pool threads. But the Disruptor could include a specific wait strategy for async processors (I do not think we would need to support multiple implementations).

What do you think?

It could be interesting to expose EventHandler as IAsyncEnumerable<List<T> data> (or just IValueTaskSource<List<T> data> to better match the semantics of single consumer).

  • Call MoveNext the first time => first batch, the second batch accumulates until we call MoveNext
  • Next call to MoveNext => as if we return from OnBatch, get available next batch, process it asynchronously
  • Repeat ^
  • Dispose => return of the last OnBatch

So this is reversal of push model to pull model, which implicit backpressure if a consumer is too slow and the ring buffer becomes full.

On the consumer side there could be a big async loop that compiles to a single big state machine that does not allocate if we never cross the sync<->async boundary. A use case: serialize messages to byte arrays, then push already serialized batches to network or disk using async methods. E.g. via such API.

The original API:

public interface IAsyncBatchEventHandler<T> where T : class
{
    ValueTask OnBatch(List<T> data, long sequence);
}

is push based and the problem is that we then need a sync code to wait (or not) for an async code.

@buybackoff It is a very interesting idea indeed.

The API would be something like:

public interface IAsyncBatchEventHandler<T> where T : class
{
    void Run(IAsyncEnumerable<List<T>> batches, CancellationToken cancellationToken);
}

Yet, I feel that the only correct implementation of this interface would be:

public void Run(IAsyncEnumerable<List<T>> batches)
{
    await foreach (var batch in batches.WithCancellation(token).ConfigureAwait(false))
    {
        // Process batch events
    }
}

If it is really the only correct implementation, I suspect that I might be better to move it to the framework side and stick with a simple ValueTask OnBatch(List<T> data, long sequence) method on the handler. A ValueTask method does not imply that every call will be async.