/Seedwork.CQRS.Bus

Primary LanguageC#MIT LicenseMIT

Build status Coverage Quality Gate Status NuGet

Seedwork.CQRS.Bus [EN]/[BR]

Seedwork.CQRS.Bus is a project to make RabbitMQ easier to use and control some basic flows.

How to use?

Register BusConnection as singleton in your project.

BusConnection requires:

  • BusConnectionString: RabbitMQ connection string;
  • IBusSerializer: The default serializer;
  • IServiceScopeFactory: Scope factory for DI.
services
    .AddBusCore(
        configuration,
        options =>
        {
            options
                .SetOptions("BusConnectionOptions")
                .SetConnectionString("amqp://guest:guest@localhost/")
                .SetSerializer<BusSerializer>();
        });

Publish

All the publish happens in batch, so you will have to wait until PublisherBufferSize achieved or PublisherBufferTtlInMilliseconds runs out.

How to publish a simple message?

var exchange = Exchange.Create("exchange", ExchangeType.Direct);
var queue = Queue.Create($"exchange.queue-created");
var routingKey = RoutingKey.Create(queue.Name.Value);
string message = "Message";

_connection.Publish(exchange, queue, routingKey, message);

How to publish a list of messages?

var exchange = Exchange.Create("exchange", ExchangeType.Direct);
var queue = Queue.Create($"exchange.queue-created");
var routingKey = RoutingKey.Create(queue.Name.Value);
string[] messages = new [] {
    "Message 1",
    ...
    "Message N"
};

_connection.PublishBatch(exchange, queue, routingKey, messages);

How to process a message?

_connectionFixture.Connection.Subscribe<string>(
    exchange,
    queue,
    routingKey,
    prefetchCount,
    async (scope, message) =>
    {
        // process message
    });

When fails to process a message, whats happens?

  • When a message fails to be processed, the application will re-queue it to the retry-queue with a message expiration, to send it back to the main queue later;
  • When the max attempts is achieved, the application will route it to failed-queue;
  • When the system fails to re-queue, it will nack the message.

How long it takes to requeue message to the main queue?

There are two options:

  • ConstantRetryBehavior (DEFAULT): T[final] = coeficient;

  • ArithmeticProgressionRetryBehavior: T[final] = T[initial] + (attempt - 1) * coeficient;

  • GeometricProgressionRetryBehavior: T[final] = T[initial] * pow(coeficient, attempt - 1);

  • You can set your own method too.

How can I set the retry behavior?

services
    .AddBusCore(
        configuration,
        options =>
        {
            options
                // Constant
                .UseConstantRetryBehavior(<coeficient>)
                // Aritmethic progression 
                .UseArithmeticProgressionRetryBehavior(<coeficient>, <initialValue> = 1)
                // Geometric progression
                .UseGeometricProgressionRetryBehavior(<coeficient>, <initialValue> = 1)
                // Custom behavior
                UseRetryBehavior(IRetryBehavior);
        });

How can I configure the max attempts?

var exchange = Exchange.Create("exchange", ExchangeType.Direct);
var queue = Queue.Create($"exchange.queue-created");
var routingKey = RoutingKey.Create(queue.Name.Value);
var message = Message.Create(
    "Message", // data
    10);       // max attempts

_connection.Publish(exchange, queue, routingKey, message);
  • When publisher not set max attempts, a default value will be set.
  • When publisher set max attempts to zero, it will retry forever.

What can I configure?

  • PublisherBufferSize (default: 1000): Message's limit to publish at once;
  • PublisherBufferTtlInMilliseconds (default: 5000): Publish messages when the limit is not achieved;
  • ConnectionMaxRetry (default: 10): Max attempts to connect to bus before fails;
  • ConnectionRetryDelayInMilliseconds (default: 500): Delay between connection attempts;
  • ConsumerMaxParallelTasks (default: 500): Thread's limit to process;
  • MessageMaxRetry (default: 5): Max attempts to process a message;
  • PublishMaxRetry (default: 5): Max attempts to publish messages;
  • PublishRetryDelayInMilliseconds (default: 100): Delay between publish attempts.

Events

  • PublishSuccessed: When publish success, the system will dispatch this event with messages sent.
_connection.PublishSuccessed += items => 
{
    ...
};
  • PublishFailed: When publish fails after all attempts, the system will dispatch this event with messages and exception.
_connection.PublishFailed += (items, exception) => 
{
    ...
};

RequestKey

Sometimes you just need to track your message over your applications, but how? Well, you can set a request key to the message, and that value is passed through all consumers.

var message = Message.Create("Message", 1, "Request-Key");
_connection.Publish(exchange, queue, routingKey, message);

In your consumer it's quite simple to access the value.

_connectionFixture.Connection.Subscribe<string>(
    exchange,
    queue,
    routingKey,
    prefetchCount,
    async (scope, message) =>
    {
        log.Info(message.RequestKey);
        // process message
    });