zarusz/SlimMessageBus

Kafka Hosted Service Issue

faisalbwn opened this issue · 5 comments

Dear I am creating hosting service as following. The problem is, after injecting SlimBus it doesn't subscribe to topic until i get the service from DI container as highlighted below in the code.

public static async Task Main(string[] args)
{
//IServiceCollection services = new ServiceCollection();

        await Host.CreateDefaultBuilder(args)
           .ConfigureServices(services =>
           {
               services.AddSlimMessageBus((mbb, svp) =>
                {
                    mbb.WithProviderKafka(new KafkaMessageBusSettings("localhost:9092")
                    {
                        ConsumerConfig = (config) =>
                        {
                            config.StatisticsIntervalMs = 60000;
                            config.AutoOffsetReset = Confluent.Kafka.AutoOffsetReset.Latest;
                        }
                    })
                    //.Consume<SomeMessage>(x => x
                    //          .Topic("slimbustopic2")
                    //          .WithConsumer<SomeMessageConsumer>()
                    //          .KafkaGroup("slimbusconsumergroup1"))
                    .Handle<SampleRq, SampleRs>(s =>
                    {
                        s.Topic("request-response-topic2");
                        s.KafkaGroup("request-response-consumer-group2");
                        s.WithHandler<SampleRqHandler>();
                        s.Instances(1);
                    })
                    .PerMessageScopeEnabled(true)
                    .WithSerializer(new JsonMessageSerializer()).Build();
                },
                   addConsumersFromAssembly: new[] { Assembly.GetExecutingAssembly() },
                   addInterceptorsFromAssembly: new[] { Assembly.GetExecutingAssembly() },
                   addConfiguratorsFromAssembly: new[] { Assembly.GetExecutingAssembly() }
                );

               **// Get service from DI container
               var provider = services.BuildServiceProvider();
               provider.GetRequiredService<IMessageBus>();**
           })
        .Build()
        .RunAsync();
    }
}

hey @faisalbwn ,

In general, we need to inject IMessageBus somewhere on app start for the first time, forcing the MS Dependency Injection singletons to be created. SMB internally has a singleton that orchestrates message consumption.

Inspired by this thread dotnet/runtime#43149, you could use the IHostedService to initialize the needed interfaces:

internal class ApplicationService : IHostedService
{
        private readonly IMessageBus _messageBus;

        // Note: Injecting IMessageBus will force MsDependencyInjection to eagerly load SMB consumers upon start.
        public ApplicationService(IMessageBus messageBus) => _messageBus = messageBus;
        
}

and in the host builder register it:

        await Host.CreateDefaultBuilder(args)
            .ConfigureServices((ctx, services) =>
            {
               // register the host service
                services.AddHostedService<ApplicationService>();

                services.AddSlimMessageBus((mbb, svp) =>
                   // ...
              
                );
            })
            .Build()
            .RunAsync();

Does that solve the issue?

btw, I have upgraded the console app sample to .NET Generic Host too - see linked PR.

Also, I see in your sample that you invoke .Build(); - this is not needed as the .AddSlimMessageBus() manages the builder and calls .Build(); already.

Dear zarusz,

Thanks very much for the solution. It resolved the issue.
BTW IHostedService enforce to implement two methods StartAsync and StopAsync as follows. I returned completed task because there are no start stop methods belongs to bus. Can you kindly confirm if below implementation is fine?

internal class ApplicationService : IHostedService
{
private readonly IMessageBus _messageBus;

    // Note: Injecting IMessageBus will force MsDependencyInjection to eagerly load SMB consumers upon start.
    public ApplicationService(IMessageBus messageBus) => _messageBus = messageBus;

    public Task StartAsync(CancellationToken cancellationToken)
    {
        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        return Task.CompletedTask;
    }
}

@faisalbwn , yes your sample is correct.

The StartAsync and StopAsync are invoked when the console app starts or when it's about close respectively.

Also, check this piece https://github.com/zarusz/SlimMessageBus/blob/master/docs/intro.md#start-or-stop-message-consumption. By default message consumers are started on bus creation.

However, If consumer auto start would not be enabled (it is enabled by default) then you could start the message consumption in the StartAsync.

Cheers

@faisalbwn do you have further questions? If not please close the issue.

Thanks very much zarusz.