Redis Message Pipeline


This Nuget package provides a robust solution for managing message pipelines with Redis. It ensures that each message is handled once and in order, with a focus on high reliability and consistency across distributed systems. The package is designed to handle failures gracefully, retrying message handling based on configurable policies or stopping the pipeline until manual intervention.


  • Single Message Handling: Each message is processed individually to ensure order and consistency.
  • Failure Handling: Automatic retries or stops based on user-defined policies to manage message handling failures.
  • Pipeline Control: Administrative controls to stop, resume, or clean the pipeline, providing flexibility in managing message flow.

Getting Started


Install the package from Nuget:

dotnet add package RedisMessagePipeline


Configure the Redis client and pipeline settings in your application:

Configure Services:

In your application's startup configuration, register RedisPipelineFactory:

public void ConfigureServices(IServiceCollection services)
    // Using an existing IConnectionMultiplexer instance:

    // Or, initiating a new IConnectionMultiplexer with a connection string:
    services.AddRedisPipelineFactory(redisConnectionString, settings);

Manually create Facroty with Redis Database and LockerFactory

ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("localhost:6379");
RedLockMultiplexer lockMultiplexer = new RedLockMultiplexer(redis);
IDatabase db = redis.GetDatabase();

var loggerFactory = new LoggerFactory();
RedLockFactory lockFactory = RedLockFactory.Create(new List<RedLockMultiplexer> { lockMultiplexer });
RedisPipelineFactory factory = new RedisPipelineFactory(loggerFactory, lockFactory, db);

Create Admin and Consumer

var consumer = factory.CreateConsumer(new MyMessageHandler(), new RedisPipelineConsumerSettings("my-messages"));
var admin = factory.CreateAdmin(new RedisPipelineAdminSettings("my-messages"));


Administrate the pipeline:

// Stop the pipeline
await admin.StopAsync();

// Push messages
for (int i = 0; i < 10; i++)
    await admin.PushAsync($"message:{i}");

// Resume the pipeline, skipping problematic messages if necessary
await admin.ResumeAsync(1, CancellationToken.None);

Start the consumer to process messages:

await consumer.ExecuteAsync(CancellationToken.None);


Distributed under the MIT License. See LICENSE for more information.


For support and contributions, please contact the package maintainer at