Seedwork.CQRS.Bus [EN]/[BR]
Seedwork.CQRS.Bus is a project to make RabbitMQ easier to use and control some basic flows.
Register BusConnection as singleton in your project.
BusConnection requires:
- BusConnectionString: RabbitMQ connection string;
- IBusSerializer: The default serializer;
- IServiceScopeFactory: Scope factory for DI.
services
.AddBusCore(
configuration,
options =>
{
options
.SetOptions("BusConnectionOptions")
.SetConnectionString("amqp://guest:guest@localhost/")
.SetSerializer<BusSerializer>();
});
All the publish happens in batch, so you will have to wait until PublisherBufferSize achieved or PublisherBufferTtlInMilliseconds runs out.
var exchange = Exchange.Create("exchange", ExchangeType.Direct);
var queue = Queue.Create($"exchange.queue-created");
var routingKey = RoutingKey.Create(queue.Name.Value);
string message = "Message";
_connection.Publish(exchange, queue, routingKey, message);
var exchange = Exchange.Create("exchange", ExchangeType.Direct);
var queue = Queue.Create($"exchange.queue-created");
var routingKey = RoutingKey.Create(queue.Name.Value);
string[] messages = new [] {
"Message 1",
...
"Message N"
};
_connection.PublishBatch(exchange, queue, routingKey, messages);
_connectionFixture.Connection.Subscribe<string>(
exchange,
queue,
routingKey,
prefetchCount,
async (scope, message) =>
{
// process message
});
- When a message fails to be processed, the application will re-queue it to the retry-queue with a message expiration, to send it back to the main queue later;
- When the max attempts is achieved, the application will route it to failed-queue;
- When the system fails to re-queue, it will nack the message.
There are two options:
-
ConstantRetryBehavior (DEFAULT): T[final] = coeficient;
-
ArithmeticProgressionRetryBehavior: T[final] = T[initial] + (attempt - 1) * coeficient;
-
GeometricProgressionRetryBehavior: T[final] = T[initial] * pow(coeficient, attempt - 1);
-
You can set your own method too.
services
.AddBusCore(
configuration,
options =>
{
options
// Constant
.UseConstantRetryBehavior(<coeficient>)
// Aritmethic progression
.UseArithmeticProgressionRetryBehavior(<coeficient>, <initialValue> = 1)
// Geometric progression
.UseGeometricProgressionRetryBehavior(<coeficient>, <initialValue> = 1)
// Custom behavior
UseRetryBehavior(IRetryBehavior);
});
var exchange = Exchange.Create("exchange", ExchangeType.Direct);
var queue = Queue.Create($"exchange.queue-created");
var routingKey = RoutingKey.Create(queue.Name.Value);
var message = Message.Create(
"Message", // data
10); // max attempts
_connection.Publish(exchange, queue, routingKey, message);
- When publisher not set max attempts, a default value will be set.
- When publisher set max attempts to zero, it will retry forever.
- PublisherBufferSize (default: 1000): Message's limit to publish at once;
- PublisherBufferTtlInMilliseconds (default: 5000): Publish messages when the limit is not achieved;
- ConnectionMaxRetry (default: 10): Max attempts to connect to bus before fails;
- ConnectionRetryDelayInMilliseconds (default: 500): Delay between connection attempts;
- ConsumerMaxParallelTasks (default: 500): Thread's limit to process;
- MessageMaxRetry (default: 5): Max attempts to process a message;
- PublishMaxRetry (default: 5): Max attempts to publish messages;
- PublishRetryDelayInMilliseconds (default: 100): Delay between publish attempts.
- PublishSuccessed: When publish success, the system will dispatch this event with messages sent.
_connection.PublishSuccessed += items =>
{
...
};
- PublishFailed: When publish fails after all attempts, the system will dispatch this event with messages and exception.
_connection.PublishFailed += (items, exception) =>
{
...
};
Sometimes you just need to track your message over your applications, but how? Well, you can set a request key to the message, and that value is passed through all consumers.
var message = Message.Create("Message", 1, "Request-Key");
_connection.Publish(exchange, queue, routingKey, message);
In your consumer it's quite simple to access the value.
_connectionFixture.Connection.Subscribe<string>(
exchange,
queue,
routingKey,
prefetchCount,
async (scope, message) =>
{
log.Info(message.RequestKey);
// process message
});