KafkaFlow is a .NET framework to consume and produce Kafka messages with multi-threading support. It's very simple to use and very extendable. You just need to install, configure, start/stop the bus with your app and create a middleware/handler to process the messages.
KafkaFlow uses Confluent Kafka Client.
- Multi-threaded consumer with message order guarantee
- Middleware support implementing
IMessageMiddleware
interface - Native support for topics with many message types
- Multiple topics in the same consumer
- Serializer middleware (ProtoBuf, Json, and NewtonsoftJson or implementing
IMessageSerializer
interface) - Compressor middleware (Gzip or implementing
IMessageCompressor
interface) - Graceful shutdown (wait to finish processing to shutdown)
- Store offset when processing ends, avoiding message loss
- Supports .NET Core and .NET Framework
- Can be used with any dependency injection framework (see here)
- Fluent configuration
Install KafkaFlow.Microsoft.DependencyInjection package
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddKafka(kafka => kafka
// Install KafkaFlow.LogHandler.Console or implement ILogHandler interface
.UseConsoleLog()
.AddCluster(cluster => cluster
.WithBrokers(new[] { "localhost:9092" })
.AddConsumer(consumer => consumer
.Topic("test-topic")
.WithGroupId("print-console-handler")
.WithBufferSize(100)
.WithWorkersCount(10)
.WithAutoOffsetReset(AutoOffsetReset.Latest)
.AddMiddlewares(middlewares => middlewares
// Install KafkaFlow.Compressor and Install KafkaFlow.Compressor.Gzip
.AddCompressor<GzipMessageCompressor>()
// Install KafkaFlow.Serializer and Install KafkaFlow.Serializer.Protobuf
.AddSerializer<ProtobufMessageSerializer>()
// Install KafkaFlow.TypedHandler
.AddTypedHandlers(handlers => handlers
.WithHandlerLifetime(InstanceLifetime.Singleton)
.AddHandler<PrintConsoleHandler>())
)
)
.AddProducer("producer-name", producer => producer
.DefaultTopic("test-topic")
.AddMiddlewares(middlewares => middlewares
.AddSerializer<ProtobufMessageSerializer>()
.AddCompressor<GzipMessageCompressor>()
)
)
)
);
}
public void Configure(
IApplicationBuilder app,
IHostApplicationLifetime lifetime,
IServiceProvider serviceProvider)
{
var bus = serviceProvider.CreateKafkaBus();
// Starts and stops the bus when you app starts and stops to graceful shutdown
lifetime.ApplicationStarted.Register(
a => bus.StartAsync(lifetime.ApplicationStopped).GetAwaiter().GetResult(),
null);
}
}
See the samples folder for more usages
- Fork this repository
- Follow project guidelines
- Do your stuff
- Open a pull request following conventional commits
By sending us your contributions, you are agreeing that your contribution is made subject to the terms of our Contributor Ownership Statement