zarusz/SlimMessageBus

Cannot remove consumers dynamically during runtime

yairush88 opened this issue · 12 comments

Hi,

Cannot remove consumers after building the messagebus and during application run time. Is this even possible?
Also I would like to publish messages even if there no consumers. But I can't do that because I get a message:
"Message of type {some type} was not registered as a supported produce message.
In short, I would like to add and remove consumers on the fly during runtime. It is a must for my application. I do I achieve this
with using the MemoryMessageBus?

zarusz commented

Hello,

Currently, the implementation is optimized for upfront declared consumers - you cannot remove or add them on the fly at runtime. The same goes for the producers. The reason for that is it performs some optimizations during the build, but also the configuration happens

However, with v2 is only possible to be done when the app start during services setup by using several .AddSlimMessageBus() - might be helpful if you're doing some dynamic assembly loading.

Also, if you're using .AutoDeclareFromConsumers() it scans all of the consumers/handler types and declares producers and consumers from the found types. Perhaps you need to declare all the possible tyles used upfront?

As an extreme perhaps declare a common base type across your conumers and producers (mbb.Produce<object>() + mbb.Consume<object>()) but I haven't tested that setup.

If I understand your use case more perhaps I could offer an alternative approach.

My use case is as follows: I have an MVVM application. Now, this application has main window and some modal windows floating.
Each window is bound to a view mode inherits from IConsumer.
So whenever close a modal window, I dispose the VM, But the view model never stops from consuming messages of SomeMessage.
I would like to be able to dispose a view model and never get the messages it consumes.
In addition I register the view models as singletons. But because the view models are also consumers, the are registered as Transient in DI, which not good for me.
So to summarize I need two things:

  1. To register my view models (which also consume messages) as Singletons.
  2. To stop consuming messages when disposing a view model consumer.

I build the messagebus as follows:
var host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
var executingAssembly = Assembly.GetExecutingAssembly();
services
.AddHostedService()
.AddSingleton<IConsumer, SomeMessageConsumer>()
.AddSlimMessageBus(mbb =>
{
var sp = services.BuildServiceProvider();
mbb
.AddServicesFromAssembly(executingAssembly)
.WithDependencyResolver(sp)
.WithProviderMemory(config => config.EnableMessageSerialization = false)
.AutoDeclareFrom(executingAssembly);
});
});

Note that I registred IConsumer as a singleton but the constructor of SomeMessageConsumer gets called each time I publish a message of type SomeMessage.
I would think that because I use .WithDependencyResolver(sp) My consumer will be a singleton as I've just registered it in my container, but it still remains transient.

zarusz commented

Ok, I think I understand your use case now.

So a couple of things from the sample:

  1. When using services.AddSlimMessageBus() then you should not use WithDependencyResolver() as it is automatically is called when the services are built. Also, I think building var sp = services.BuildServiceProvider(); under AddSlimMessageBus won't work as expected...

  2. I could extend the AddServicesFromAssembly() to allow for providing an optional DI lifetime (currently it's set as transient). Let me do that here soon, as that was supposed to be the case either way.

  3. The .AddSingleton<IConsumer, SomeMessageConsumer>() is redundant as the AutoDeclareFrom() will register types in the DI under the concrete type (SomeMessageConsumer). If you can't wait for 2) then this .AddSingleton<SomeMessageConsumer>() would work in your case - just also follow 1).

The hardest part is to ensure the MVVM publish/subscribe works as you'd expect.
I have further questions here:

  • where are you publishing the events from?
  • the particular view would resolve the ViewModel from constructor injection?

The reason why I am asking, it might be good to create scope for the whole view (one window), inject the IMessageBus within that scope and have the consumers scoped too. That way you'd have all of your dependencies scoped to the particular view.
Now, I haven't done Xamarin/WinForms that much, but it could work with some view activation glue code that would create a scope from IServiceProvider.CreateScope()..

First, let me thank you for your thorough and quick responses, very helpful!
Regarding 3) and following your suggestion, I removed AutoDeclareFrom and registered SomeMessageConsumer directly, and now the lifetime of the instance is a singleton as I've registered it in my container, which is exactly what I wanted, thanks for the tip.
Regarding your questions:

  1. I publish the events from a view model, and then another view model (or more than one) consumes those messages.
  2. Each of my view models implements a correlating interface, so for example, I have a view model MapVM : IMapVM
    And I register MapVM as a singleton in my container: .AddSignleton<IMapVM, MapVM>()
    Each view could contain sub views inside it (user controls) which are also bound to their correlated view models, so that view models can be injected inside other containing view models in their constructor.
  3. I looked at the code of AutoDeclareFrom in your framewwork, and it doesn't seem to register consumers in the DI, that part AddServicesFromAssembly does, as far as I understand from the code in your framework. So that I must register my consumers in the DI container myself (if I would like their lifetime to be singleton as oppsosed to transient when using AddServicesFromAssembly).

The important part is that EVERY view model (which is contained in a parent view model or containing other view models) can consume messages and publish messages as well.

Now, it is very critical for us for be able to remove/disable consumers on the fly during runtime. So critical that we are examining other message bus frameworks that could do that, although your framework is very good but lacks some crucial functionality that we need.

zarusz commented

The AddServicesFromAssembly scans assemblies and registers them in the DI.
The AutoDeclareFrom scans assemblies and declares Produce and Consume on the memory bus, so that you don't have to. They complement each other. The AddServicesFromAssembly registers services as transient, but as I said will add the ability to override the default lifetime (today its transient).

The disable consumers feature could be added to the memory bus. Since we're not talking about a true dynamic remove or dynamic add then it could be possible. The question is on what basis would you know which consumers should be disabled?

Let say having a "control" interface that would allow filtering which consumers are active for a given publish/send context would work for you?

If so you could even achieve that I think with the consumer interceptor nowadays: https://github.com/zarusz/SlimMessageBus/blob/master/docs/intro.md#consumer-lifecycle

Define a generic consumer interceptor that would not execute the underlying IConsumer/IRequestHandler.

I will try using interceptor as you suggest.
Regarding disabling consumers: I would like the framework to have some ConsumerManager that would have a methods like: AddConsumer(Type consumer), RemoveConsumer(Type consumer), PauseConsumer(Type consumer), ResumeConsumer(Type consumer), so that I could inject this ConsumerManager anywhere I want in my application and call these methods during application runtime. That would be very simple and convenient to use!

Another very problematic issue for us:

  1. I noticed registering a Generic type which is a consumer is not possible, I get the following exception:
    image

And my consumer is defined like this:

    public class SomeMessageConsumer<T> : IConsumer<SomeMessage>
    {
        public SomeMessageConsumer()
        {
            
        }

        public Task OnHandle(SomeMessage message)
        {
            return Task.CompletedTask;
        }
    }

So, is this even possible to have a generic class implement IConsumer?

zarusz commented

This seems like an MSDI exception.

The .AutoDeclareFrom() is finding all the types that implement IConsumer<T> in this case it's IConsumer<SomeMessage> which then it declared that the type of consumer is SomeConsumer<T> (it's an open generic).
Then MSDI cannot instaniate SomeConsumer<T> from IConsume<SomeMessage> as it does't know where to take the generic param from.

If you try this it might help.
services.AddSingleton(typeof(IConsumer<>), typeof(SomeMessageConsumer<>));

However, I am lost on why aren't you doing this in your class definition?
SomeMessageConsumer<T> : IConsumer<T>

Perhaps a shared sample would be better to collaborate on.

zarusz commented

@yairush88 the ability to provide custom Lifetime.Singleton is available on the latest 2.1.5 version.

I've been thinking about this ticket some. Here is a bit of code (running off memory so things might be off):

services.AddSlimMessageBus(mbb => 
{
  mbb
    .AddServciesFromAssembly(executingAssembly, consumerLifetime: Lifetime.Singleton);
    .WithProviderMemory()
    // this will register all the found consumer concrete types under the "general" topic)
    .AutoDeclareFrom(executingAssembly, messageTypeToTopicConverter: t => "general")
    // this will allow to send ANY type into the "general" topic  
    .Produce<object>(x => x.DefaultTopic("general"));
});
// This interceptor will allow to skip certain consumer from executing
services.AddTransient(typeof(IConsumerInterceptor<>), typeof(DisablingConsumerInterceptor<>));
services.AddSingleton<OpenViewModels>();
// stores the opened view models that should be recieving messages
public class OpenViewModels {

private  Dictionary<Type, bool> _enabled = new();

  public bool IsEnabled(Type viewModelType) => _enabled.ContainsKey(viewModelType) && _enabled[viewModelType];
  
  public void Enable(Type viewModelType) => _enabled[viewModelType] = true;
  public void Disable(Type viewModelType) => _enabled[viewModelType] = false;
}
// interceptor that checks if the specified type of consumer can fire given what's enabled in OpenViewModels 
public class DisablingConsumerInterceptor<T> : IConsumerInterceptor<T> 
{
  private readonly OpenViewModels _open;
  
  public DisablingConsumerInterceptor(OpenViewModels open) => _open = open;

  public async Task<object> OnHandle(TMessage message, Func<Task<object>> next, IConsumerContext context) 
  {  
    if (! _open.IsEnabled(context.Consumer.GetType()))  {
      // do not pass to the consumer
      return null;
    }   
    // allow to pass to the consumer
    return await next();
  }
}

Again, it would be better to collab on some actual sample.

@zarusz Thanks for your help!

zarusz commented

@yairush88 did it work or did you go with another library?

I went with MassTransit library, which is way more flexible and easy to work with. I'm sure you know this library, which is very popular. I did some testing with it's in-memory transport and it felt more intuitive and easier. Altogether, your great library works flawlessly in our production and customers are very happy.
But now, as I've explained to you, we need to so some refactoring regarding the messagebus, that we find MassTransit does better and easier.
Nevertheless, thanks for your time and effort!

zarusz commented

Yes I know mass transit. Sorry to hear that it didn't work out.

Could you post an example how you were able to achieve the dynamic consumer add and removal with mass transit? I wonder what is missing here so that I can adapt and perhaps incorporate into the memory transport.