/kafka-flow

KafkaFlow is a .NET framework to consume and produce Kafka messages with multi-threading support. It's very simple to use and very extendable. You just need to install, configure, start/stop the bus with your app and create a middleware/handler to process the messages.

Primary LanguageC#MIT LicenseMIT

Build Master Codacy Badge

KafkaFlow

KafkaFlow is a .NET framework to consume and produce Kafka messages with multi-threading support. It's very simple to use and very extendable. You just need to install, configure, start/stop the bus with your app and create a middleware/handler to process the messages.

KafkaFlow uses Confluent Kafka Client.

Packages

Package NuGet Stable Downloads
KafkaFlow KafkaFlow KafkaFlow
KafkaFlow.Abstractions KafkaFlow.Abstractions KafkaFlow
KafkaFlow.Serializer KafkaFlow.Serializer KafkaFlow.Serializer
KafkaFlow.Serializer.ProtoBuf KafkaFlow.Serializer.ProtoBuf KafkaFlow.Serializer.ProtoBuf
KafkaFlow.Serializer.Json KafkaFlow.Serializer.Json KafkaFlow.Serializer.Json
KafkaFlow.Serializer.NewtonsoftJson KafkaFlow.Serializer.NewtonsoftJson KafkaFlow.Serializer.NewtonsoftJson
KafkaFlow.Compressor KafkaFlow.Compressor KafkaFlow.Compressor
KafkaFlow.Compressor.Gzip KafkaFlow.Compressor.Gzip KafkaFlow.Compressor.Gzip
KafkaFlow.TypedHandler KafkaFlow.TypedHandler KafkaFlow.TypedHandler
KafkaFlow.Microsoft.DependencyInjection KafkaFlow.Microsoft.DependencyInjection KafkaFlow.Microsoft.DependencyInjection
KafkaFlow.Unity KafkaFlow.Unity KafkaFlow.Unity
KafkaFlow.LogHandler.Console KafkaFlow.LogHandler.Console KafkaFlow.LogHandler.Console

Features

  • Multi-threaded consumer with message order guarantee
  • Middleware support implementing IMessageMiddleware interface
  • Native support for topics with many message types
  • Multiple topics in the same consumer
  • Serializer middleware (ProtoBuf, Json, and NewtonsoftJson or implementing IMessageSerializer interface)
  • Compressor middleware (Gzip or implementing IMessageCompressor interface)
  • Graceful shutdown (wait to finish processing to shutdown)
  • Store offset when processing ends, avoiding message loss
  • Supports .NET Core and .NET Framework
  • Can be used with any dependency injection framework (see here)
  • Fluent configuration

Usage

.NET Core

Install KafkaFlow.Microsoft.DependencyInjection package

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddKafka(kafka => kafka
            // Install KafkaFlow.LogHandler.Console or implement ILogHandler interface
            .UseConsoleLog() 
            .AddCluster(cluster => cluster
                .WithBrokers(new[] { "localhost:9092" })
                .AddConsumer(consumer => consumer
                    .Topic("test-topic")
                    .WithGroupId("print-console-handler")
                    .WithBufferSize(100)
                    .WithWorkersCount(10)
                    .WithAutoOffsetReset(AutoOffsetReset.Latest)
                    .AddMiddlewares(middlewares => middlewares
                        // Install KafkaFlow.Compressor and Install KafkaFlow.Compressor.Gzip
                        .AddCompressor<GzipMessageCompressor>() 
                        // Install KafkaFlow.Serializer and Install KafkaFlow.Serializer.Protobuf
                        .AddSerializer<ProtobufMessageSerializer>()
                        // Install KafkaFlow.TypedHandler
                        .AddTypedHandlers(handlers => handlers
                            .WithHandlerLifetime(InstanceLifetime.Singleton)
                            .AddHandler<PrintConsoleHandler>())
                    )
                )
                .AddProducer("producer-name", producer => producer
                    .DefaultTopic("test-topic")
                    .AddMiddlewares(middlewares => middlewares
                        .AddSerializer<ProtobufMessageSerializer>()
                        .AddCompressor<GzipMessageCompressor>()
                    )
                )
            )
        );
    }

    public void Configure(
        IApplicationBuilder app,
        IHostApplicationLifetime lifetime,
        IServiceProvider serviceProvider)
    {
        var bus = serviceProvider.CreateKafkaBus();

        // Starts and stops the bus when you app starts and stops to graceful shutdown
        lifetime.ApplicationStarted.Register(
            a => bus.StartAsync(lifetime.ApplicationStopped).GetAwaiter().GetResult(),
            null);
    }
}

See the samples folder for more usages

Documentation

Wiki Page

Contributing

  1. Fork this repository
  2. Follow project guidelines
  3. Do your stuff
  4. Open a pull request following conventional commits

Disclaimer

By sending us your contributions, you are agreeing that your contribution is made subject to the terms of our Contributor Ownership Statement

Maintainers

License

MIT