/extension-confluent-kafka-client

The project extends functionality to confluent-kafka-dotnet by adding an abstraction layer with buffering.

Primary LanguageC#MIT LicenseMIT

extension-confluent-kafka-client

build integration-tests

The project extends functionality to confluent-kafka-dotnet by adding an abstraction layer with buffering.

IMPORTANT

All features are restricted to a AtLeastOnce consumption pattern, otherwise the underlying buffering concept will cause side-effects.

Extended Functionality

Multi-Threading support via buffers feature

Consumed messages from kafka are dispatched on internal buffers/channels. Messages are consumed from those buffers by tasks. The number of tasks and distribution is defined by setting called BufferSharding. Each task triggers a callback for application integration. The implemented approach is as well recommended by https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/.

See here

Prioritized topics feature

Each defined topic has a priority. The messages are dispatched into buffers based on the defined topic priority. That allows to process messages from high prior topics faster in case low prio topics would occupy message buffer task.

See here

Backpressure feature

Messages are disptached into bounded buffers. If the buffer capacity (defined in config) is reached, the consumer will pause consumption for TopicParitions assigned to the inbound channel.

See here

Connection monitoring feature

The connection status to broker is actively checked and disruptions are notifed via callback.

[FUTURE-RELEASE, not implemented yet] Interval consumption feature

Messages from certain defined topics are consumed in perodical interval. This allows to maintain in example a deadletter topic, which is flushed periodically.

Useage

Take a look in the examples for usage.

//Note: confluent configuration see https://github.com/confluentinc/confluent-kafka-dotnet
var confluentConfig = new ConsumerConfig
{
    GroupId = "test-group",
    ClientId = Environment.MachineName,
    BootstrapServers = "localhost:9092",
    AutoOffsetReset = AutoOffsetReset.Earliest,
    EnableAutoCommit = false
    };

//Note: actual configuration for buffered consumer
var config = new BufferedConsumerConfig
{
    BufferSharding = BufferSharding.Task,
    BufferMaxTaskCount = 5,
    TopicConfigs = new[]
    {
        new BufferedTopicConfig
        {
            TopicName = "testTopic"
        }
    }
};

var consumer = new BufferedConsumerBuilder<byte[], byte[]>(config)
    .SetConsumerBuilder(new ConsumerBuilder<byte[], byte[]>(confluentConfig))
    .SetAdminBuilder(new AdminClientBuilder(confluentConfig))
    .SetCallback(this)
    .SetHealthStatusCallback(this)
    .SetMetricsCallback(this)
    .SetChannelIdFunc((p) => p.Partition)
    .SetLogger(logger)
    .Build();

In order to process received messages SetCallback needs to be set. Here is a list of callback interfaces which can be implemented:

Configuration

Configuration properties and defaults can be found here configuration.

Developer Notes

The extension-confluent-kafka-client project is MIT licensed here.

Please consider license of dependencies, [confluent-kafka-dotnet] (https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/LICENSE) is not MIT licensed.