Lightweight core messaging library and boilerplate with RabbitMQ, MassTransit and SagaStateMachine
Link: -
- Provides basic RabbitMQ BUS instance
- Includes basic producer & consumer boilerplate
- Includes basic model workflow, state machine and sagas
Firstly you have to add these keys in your configuration file.
<appSettings>
<add key="RabbitMQUri" value="rabbitmq://hostAddress/"/>
<add key="RabbitMQUserName" value="username"/>
<add key="RabbitMQPassword" value="password"/>
</appSettings>
Initializing RabbitMQ BUS instance for Producer:
IBusControl busControl = BusConfigurator.Instance.ConfigureBus();
var sendToUri = new Uri("rabbitmqUri/queueName");
ISendEndpoint bus = busControl.GetSendEndpoint(sendToUri).Result;
after RabbitMQ BUS instance initializing then you can use Send method with your queues channel TCommand type.
bus.Send<TCommand>(new
{
SomeProperty = SomeValue
}
);
RabbitMQ BUS instance using for Consumer:
static void Main(string[] args)
{
var bus = BusConfigurator.Instance
.ConfigureBus((cfg, host) =>
{
cfg.ReceiveEndpoint(host, "queueName", e =>
{
e.Consumer<TCommandConsumer>();
});
});
bus.Start();
Console.ReadLine();
}
TCommandConsumer could like below:
public class TCommandConsumer : IConsumer<TCommand>
{
public async Task Consume(ConsumeContext<TCommand> context)
{
var command = context.Message;
//do something...
await Console.Out.WriteAsync($"{command.SomeProperty}");
}
}
PS: Publisher and Consumer services must be used same TCommand interface. This case important for MassTransit integration.
There are several options you can set via fluent interface:
.UseRetryPolicy(IRetryPolicy retryPolicyement)
.UseCircuitBreaker(int tripThreshold, int activeThreshold, int resetInterval)
.UseRateLimiter(int rateLimit, int interval)