Cysharp/MessagePipe

Need help to implement IAsyncPublisher with IAsyncSubscriber

frozzen10 opened this issue · 2 comments

I would like to create Publisher and Subscriber classes as followed

public class Subscriber
{
    private readonly IAsyncSubscriber<IRequest> _subscriber;
    private readonly IEnumerable<IAsyncMessageHandler<IRequest> _handlers;
    readonly IDisposable disposable;

    public Subscriber(IAsyncSubscriber<IRequest> subscriber, IEnumerable<IAsyncMessageHandler<IRequest> handlers)
    {
        var bag = DisposableBag.CreateBuilder(); // composite disposable for manage subscription
        _subscriber = subscriber;
        _handlers = handlers;

        foreach (var handler in _handlers)
        {
            _subscriber.Subscribe(handler).AddTo(bag);
        }
        disposable = bag.Build();
    }

    public void Close()
    {
        // unsubscribe event, all subscription **must** Dispose when completed
        disposable.Dispose();
    }
}

and here is simple Publisher class

public class Publisher
{
    private readonly IAsyncPublisher<IRequest> _publisher;

    public Publisher(IAsyncPublisher<IRequest> publisher)
    {
        _publisher = publisher;
    }

    public void Publish(IRequest message, CancellationToken cancellationToken = default)
        => _publisher.Publish(message, cancellationToken);
}

as you can see I would like to make usage of fire and forget possibility.

Here are my other classes and marker interface:

public interface IRequest { }

public class AddRequest : IRequest
{
    public int Left { get; set; }
    public int Right { get; set; }
}

public class AddCommandMessageHandler : IAsyncMessageHandler<AddRequest>
{
    public async ValueTask HandleAsync(AddRequest message, CancellationToken cancellationToken)
    {
        await Task.Delay(15 * 1000); // Simulate some work
        Console.WriteLine("Consumer: Adding {0} and {1} gives {2}", message.Left, message.Right, message.Left + message.Right);
        await Task.Delay(5 * 1000); // Simulate some work;
    }
}

And I found it really hard to register this classes with Autofac

public class AutofacModule : Module
{
    private readonly IEnumerable<Type> applicationTypes;
    private ContainerBuilder builder;

    public AutofacModule(IEnumerable<Type> applicationTypes)
    {
        this.applicationTypes = applicationTypes;
    }

    protected override void Load(ContainerBuilder builder)
    {
        this.builder = builder;
        RegisterPublisher();
        RegisterSubscriber();
        RegisterRequests();
        RegisterHandlers();
    }

    private void RegisterPublisher()
        => builder.RegisterType<Publisher>()
            .AsSelf()
            .InstancePerLifetimeScope();

    private void RegisterSubscriber()
        => builder.RegisterType<Subscriber>()
            .AsSelf()
            .InstancePerLifetimeScope();

    private void RegisterRequests()
    {
        IEnumerable<Type> types = applicationTypes.Where(t => typeof(IRequest).IsAssignableFrom(t));
        builder
            .RegisterTypes(types.ToArray())
            .AsImplementedInterfaces()
            .InstancePerLifetimeScope();
    }

    private void RegisterHandlers()
    {
        var x = applicationTypes.Where(t => t.IsClosedTypeOf(typeof(IAsyncMessageHandler<>))).ToArray();
        builder.RegisterTypes(x).AsImplementedInterfaces().InstancePerLifetimeScope();
    }
}

I would like to have all handlers for IRequest inside Subscriber object instance (Subscriber and Publisher will be registered as singletons). Maybe I am missing something and it should be done in another way. If there is different approach I will be pleased to discover it.
It is possible to achieve that? What do I do wrong?
Thanks for guidance and all possible tips!

This issue is stale because it has been open 180 days with no activity. Remove stale label or comment or this will be closed in 30 days.

Hey! You can use Autofac.Extensions.DependencyInjection to register any DI configuration built on top of Microsoft.Extensions.DependencyInjection (like MessagePipe) as follows:

using Autofac.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;

public class AutofacModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        var sc = new ServiceCollection();
        sc.AddMessagePipe();

        // "Populate" extension method is provided by "Autofac.Extensions.DependencyInjection"
        builder.Populate(sc);
    }
}