/SlimMessageBus

Lightweight pub/sub and request-response message bus interface for .NET. The goal of this library is to introduce a common and slim interface. Use one of the providers for popular message brokers. Reduces friction to add messaging to your app and help you achieve a reactive architecture.

Primary LanguageC#Apache License 2.0Apache-2.0

SlimMessageBus

SlimMessageBus is a client façade for message brokers for .NET. It comes with implementations for specific brokers (Apache Kafka, Azure EventHub, MQTT/Mosquitto, Redis Pub/Sub) and also for in memory message passing (in-process communication). SlimMessageBus additionally provides request-response messaging implementation.

Build status

Features

  • Types of messaging patterns supported:
    • Publish-subscribe
    • Request-response
    • Queues
    • Hybrid of the above (e.g. Kafka with multiple topic consumers in one group)
  • Modern async/await syntax and TPL
  • Fluent configuration
  • Because SlimMessageBus is a facade, you have the ability to swap broker implementations
    • Using nuget pull another broker provider
    • Reconfigure SlimMessageBus and retest your app
    • Try out the messaging middleware that works best for your app (Kafka vs. Redis) without having to rewrite your app.

Key elements of SlimMessageBus

  • Consumers:
    • IConsumer<in TMessage> - subscriber in pub/sub (or queue consumer)
    • IRequestHandler<in TRequest, TResponse> - request handler in request-response
  • Producers:
    • IPublishBus - publisher in pub/sub (or queue producer)
    • IRequestResponseBus - sender in req/resp
    • IMessageBus - extends IPublishBus and IRequestResponseBus
  • Misc:
    • IRequestMessage<TResponse> - marker for request messages
    • MessageBus - singleton accessor for current IMessageBus

Principles

  • The core of SlimMessageBus is "slim"
    • Simple, common and friendly API to work with messaging systems
    • No external dependencies. Logging is done via Common.Logging, so that you can connect your favorite logger provider.
    • The core interface can be used in domain model (e.g. DomainEvents)
  • Plugin architecture:
    • DI integration (Autofac, CommonServiceLocator, Unity)
    • Message serialization (JSON, XML)
    • Use your favorite messaging broker as provider by simply pulling a nuget package
  • No threads created (pure TPL)
  • Async/Await support
  • Fluent configuration

Packages

Name Descripton NuGet .NET Standard
SlimMessageBus The interfaces to work with SlimMessageBus NuGet 1.3
SlimMessageBus.Host The common implementation for the hosting application layer NuGet 1.3
SlimMessageBus.Host.Kafka Provider for Apache Kafka NuGet 1.3
SlimMessageBus.Host.AzureEventHub Provider for Azure Event Hub NuGet 2.0
SlimMessageBus.Host.Redis (future) Provider for Redis . .
SlimMessageBus.Host.InMemory (pending) Implementation for in-process (memory) message passing . .
SlimMessageBus.Host.ServiceLocator Resolves dependencies from ServiceLocator NuGet 1.3
SlimMessageBus.Host.Autofac Resolves dependencies from Autofac container NuGet 1.3
SlimMessageBus.Host.Unity Resolves dependencies from Unity container NuGet 1.3
SlimMessageBus.Host.Serialization.Json Message serialization provider for JSON NuGet 1.3

Typically your application components only need to depend on SlimMessageBus which is the facade. However, your application hosting layer (ASP.NET, Windows Service, Console App) will reference and configure the other packages (SlimMessageBus.Host.*) which are the providers and plugins.

Samples

Check out the Samples folder.

Usage examples

Request-response with Kafka

Use case:

  • The web app has to display thumbnails of large images.
  • The thumbnails are requested in the WebApi and are generated on demand (and saved) by the Worker (unless they exist already).
  • WebApi and Worker exchange messages via Apache Kafka

Frontend makes a call to resize an image 'DSC0862.jpg' to '120x80' in size, by using this URL:

http://localhost:50452/api/Image/r/DSC0862.jpg/?w=120&h=80&mode=1

This gets handled by the WebApi method of the ImageController

private readonly IRequestResponseBus _bus;
// ...
[Route("{fileId}")]
public async Task<HttpResponseMessage> GetImageThumbnail(string fileId, ThumbnailMode mode, int w, int h)
{
    var thumbFileContent = // ... try to load content for the desired thumbnail w/h/mode/fileId
    if (thumbFileContent == null)
    {
        // Task will await until response comes back (or timeout happens). The HTTP request will be queued and IIS processing thread released.
        var thumbGenResponse = await _bus.Send(new GenerateThumbnailRequest(fileId, mode, w, h));
        thumbFileContent = await _fileStore.GetFile(thumbGenResponse.FileId);
    }
    return ServeStream(thumbFileContent);
}

The GenerateThumbnailRequest request is handled by a handler in one of the pool of Worker console apps.

public class GenerateThumbnailRequestHandler : IRequestHandler<GenerateThumbnailRequest, GenerateThumbnailResponse>
{
   public async Task<GenerateThumbnailResponse> OnHandle(GenerateThumbnailRequest request, string topic)
   {
     // some processing
     return new GenerateThumbnailResponse
     {
         FileId = thumbnailFileId
     };
   }
}

The response gets replied onto the originating WebApi instance and the Task resolves causing the queued HTTP request to serve the resized image thumbnail.

var thumbGenResponse = await _bus.Send(new GenerateThumbnailRequest(fileId, mode, w, h));

Check out the full sample for image resizing application (Sample.Images.WebApi and Sample.Images.Worker)!

Basic in-memory

This example is the simplest usage of SlimMessageBus and SlimMessageBus.Host.InMemory to implement Domain Events.

... Somewhere in your domain layer a domain event gets published

// Option 1
IMessageBus bus = ... // Injected from your favorite DI container
await bus.Publish(new NewUserJoinedEvent("Jane"));

// OR Option 2
await MessageBus.Current.Publish(new NewUserJoinedEvent("Jennifer"));

... The domain event is a simple POCO

public class NewUserJoinedEvent
{
	public string FullName { get; protected set; }

	public NewUserJoinedEvent(string fullName)
	{
		FullName = fullName;
	}
}

... The event handler implements the IConsumer<T> interface

public class NewUserHelloGreeter : IConsumer<NewUserJoinedEvent>
{
    public Task OnHandle(NewUserJoinedEvent message, string topic)
    {
        Console.WriteLine("Hello {0}", message.FullName);
    }
}

... The handler can be subscribed explicitly

// Get a hold of the handler
var greeter = new NewUserHelloGreeter();

// Register handler explicitly
bus.Subscribe(greeter);

// Events are being published here

bus.UnSubscribe(greeter);

... Or when you decide to use one of the container integrations (e.g. SlimMessageBus.Host.Autofac) the handlers can be resolved from the DI container. For more details see other examples.

Setup

This is how you set the 'SlimMessageBus' up in the simplest use case.

// Define the recipie how to create our IMessageBus
var busBuilder = new MessageBusBuilder()
    .SimpleMessageBus();

// Create the IMessageBus instance from the builder
IMessageBus bus = busBuilder
    .Build();

// Set the provider to resolve our bus - this setup will work as a singleton.
MessageBus.SetProvider(() => bus);

License

Apache License 2.0

Docs

Building

To build:

.\build\do_build.ps1

To create NuGet packages (in dist folder):

.\build\do_package.ps1

To push NuGet packages to local repository:

.\build\do_push_local.ps1