This Nuget package provides a robust solution for managing message pipelines with Redis. It ensures that each message is handled once and in order, with a focus on high reliability and consistency across distributed systems. The package is designed to handle failures gracefully, retrying message handling based on configurable policies or stopping the pipeline until manual intervention.
- Single Message Handling: Each message is processed individually to ensure order and consistency.
- Failure Handling: Automatic retries or stops based on user-defined policies to manage message handling failures.
- Pipeline Control: Administrative controls to stop, resume, or clean the pipeline, providing flexibility in managing message flow.
Install the package from Nuget:
dotnet add package RedisMessagePipeline
Configure the Redis client and pipeline settings in your application:
In your application's startup configuration, register RedisPipelineFactory:
public void ConfigureServices(IServiceCollection services)
{
// Using an existing IConnectionMultiplexer instance:
services.AddRedisPipelineFactory(settings);
// Or, initiating a new IConnectionMultiplexer with a connection string:
services.AddRedisPipelineFactory(redisConnectionString, settings);
}
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("localhost:6379");
RedLockMultiplexer lockMultiplexer = new RedLockMultiplexer(redis);
IDatabase db = redis.GetDatabase();
var loggerFactory = new LoggerFactory();
RedLockFactory lockFactory = RedLockFactory.Create(new List<RedLockMultiplexer> { lockMultiplexer });
RedisPipelineFactory factory = new RedisPipelineFactory(loggerFactory, lockFactory, db);
var consumer = factory.CreateConsumer(new MyMessageHandler(), new RedisPipelineConsumerSettings("my-messages"));
var admin = factory.CreateAdmin(new RedisPipelineAdminSettings("my-messages"));
Administrate the pipeline:
// Stop the pipeline
await admin.StopAsync();
// Push messages
for (int i = 0; i < 10; i++)
{
await admin.PushAsync($"message:{i}");
}
// Resume the pipeline, skipping problematic messages if necessary
await admin.ResumeAsync(1, CancellationToken.None);
Start the consumer to process messages:
await consumer.ExecuteAsync(CancellationToken.None);
Distributed under the MIT License. See LICENSE
for more information.
For support and contributions, please contact the package maintainer at coddicat@gmail.com
.