/knightbus

Fast multi-transport messaging framework

Primary LanguageC#MIT LicenseMIT

KnightBus

Build status NuGet Documentation Status

KnightBus is a fast, lightweight and extensible messaging framework that supports multiple active message transports

Find the official KnightBus documentation here

KnightBus Logo

Transports

Package NuGet
KnightBus.Azure.ServiceBus KnightBus.Azure.ServiceBus
KnightBus.Azure.ServiceBus.Messages KnightBus.Azure.ServiceBus.Messages
KnightBus.Azure.ServiceBus.Management KnightBus.Azure.ServiceBus.Management
KnightBus.Azure.Storage KnightBus.Azure.Storage
KnightBus.Azure.Storage.Management KnightBus.Azure.Storage.Management
KnightBus.Azure.Storage.Messages KnightBus.Azure.Storage.Messages
KnightBus.Redis KnightBus.Redis
KnightBus.Redis.Messages KnightBus.Redis.Messages
KnightBus.Nats KnightBus.Nats
KnightBus.Nats.Messages KnightBus.Nats.Messages
KnightBus.PostgreSql KnightBus.PostgreSql
KnightBus.PostgreSql.Messages KnightBus.PostgreSql.Messages
KnightBus.PostgreSql.Management KnightBus.PostgreSql.Management

Monitoring

Package NuGet
KnightBus.ApplicationInsights KnightBus.ApplicationInsights
KnightBus.NewRelic KnightBus.NewRelic

Serialization

Package NuGet
KnightBus.ProtobufNet KnightBus.ProtobufNet
KnightBus.Newtonsoft KnightBus.Newtonsoft
KnightBus.MessagePack KnightBus.MessagePack

Framework

Package NuGet
KnightBus.Host KnightBus.Host
KnightBus.Core KnightBus.Core
KnightBus.Core.Management KnightBus.Core.Management
KnightBus.Messages KnightBus.Messages
KnightBus.SqlServer KnightBus.SqlServer
KnightBus.Schedule KnightBus.Schedule

Message Processing

public class CommandProcessor : IProcessCommand<SampleCommand, SampleSettings>,
{
    public CommandProcessor(ISomeDependency dependency)
    {
        //You can use your own container for dependency injection
    }

    public Task ProcessAsync(SampleCommand message, CancellationToken cancellationToken)
    {
        //Your code goes here
        return Task.CompletedTask;
    }
}

Initialization

class Program
{
    static async Task Main(string[] args)
    {
            var host = Microsoft.Extensions.Hosting.Host.CreateDefaultBuilder(args)
            .ConfigureServices(services =>
            {
                //Multiple active transports
                services.UseServiceBus(config => config.ConnectionString = "sb-connection")
                        .UseTransport<ServiceBusTransport>()
                        .UseBlobStorage(config => config.ConnectionString = "storage-connection")
                        .UseTransport<StorageTransport>()
                        .RegisterProcessors();
            })
            .UseKnightBus().Build();                

            await host.StartAsync(CancellationToken.None);
    }
}

Bring your own Middleware

KnightBus supports inserting your own middleware into the execution pipeline.

public class CustomThrottlingMiddleware : IMessageProcessorMiddleware
    {
        private readonly SemaphoreQueue _semaphoreQueue;
        public int CurrentCount => _semaphoreQueue.CurrentCount;

        public CustomThrottlingMiddleware(int maxConcurrent)
        {
            _semaphoreQueue = new SemaphoreQueue(maxConcurrent);
        }
        public async Task ProcessAsync<T>(IMessageStateHandler<T> messageStateHandler, IPipelineInformation pipelineInformation, IMessageProcessor next, CancellationToken cancellationToken) where T : class, IMessage
        {
            try
            {
                await _semaphoreQueue.WaitAsync().ConfigureAwait(false);
                await next.ProcessAsync(messageStateHandler, cancellationToken).ConfigureAwait(false);
            }
            finally
            {
                _semaphoreQueue.Release();
            }
        }
    }

Write your own Plugin

KnightBus supports custom plugins. Examples of existing plugins are: TcpAliveListener (K8S liveness probes) and Scheduling (Chron triggers).

public class CustomPlugin : IPlugin
{
    public CustomPlugin(ISomeDependency dependency, ILogger<CustomPlugin> logger)
    {        
    }

    public async Task StartAsync(CancellationToken cancellationToken)
    {
        // Start the plugin
    }
}

Documentation

To get documentation up and running locally, do the following.

  1. Install sphinx: https://www.sphinx-doc.org
  2. Install sphinx_rtd_theme: https://github.com/readthedocs/sphinx_rtd_theme
  3. Run make html source build in the documentation folder
  4. Open documentation/build/html/index.html in a browser to preview your changes

For Linux:

# In documentation folder:

$ sudo apt install python3 python3-sphinx python3-pip
$ python3 -m pip install sphinx-rtd-theme
$ make html source build
$ sensible-browser build/html/index.html