The project extends functionality to confluent-kafka-dotnet by adding an abstraction layer with buffering.
All features are restricted to a AtLeastOnce consumption pattern, otherwise the underlying buffering concept will cause side-effects.
Consumed messages from kafka are dispatched on internal buffers/channels. Messages are consumed from those buffers by tasks. The number of tasks and distribution is defined by setting called BufferSharding
. Each task triggers a callback for application integration. The implemented approach is as well recommended by https://www.confluent.io/blog/kafka-consumer-multi-threaded-messaging/.
See here
Each defined topic has a priority. The messages are dispatched into buffers based on the defined topic priority. That allows to process messages from high prior topics faster in case low prio topics would occupy message buffer task.
See here
Messages are disptached into bounded buffers. If the buffer capacity (defined in config) is reached, the consumer will pause consumption for TopicParitions assigned to the inbound channel.
See here
The connection status to broker is actively checked and disruptions are notifed via callback.
Messages from certain defined topics are consumed in perodical interval. This allows to maintain in example a deadletter topic, which is flushed periodically.
Take a look in the examples for usage.
//Note: confluent configuration see https://github.com/confluentinc/confluent-kafka-dotnet
var confluentConfig = new ConsumerConfig
{
GroupId = "test-group",
ClientId = Environment.MachineName,
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false
};
//Note: actual configuration for buffered consumer
var config = new BufferedConsumerConfig
{
BufferSharding = BufferSharding.Task,
BufferMaxTaskCount = 5,
TopicConfigs = new[]
{
new BufferedTopicConfig
{
TopicName = "testTopic"
}
}
};
var consumer = new BufferedConsumerBuilder<byte[], byte[]>(config)
.SetConsumerBuilder(new ConsumerBuilder<byte[], byte[]>(confluentConfig))
.SetAdminBuilder(new AdminClientBuilder(confluentConfig))
.SetCallback(this)
.SetHealthStatusCallback(this)
.SetMetricsCallback(this)
.SetChannelIdFunc((p) => p.Partition)
.SetLogger(logger)
.Build();
In order to process received messages SetCallback
needs to be set. Here is a list of callback interfaces which can be implemented:
Configuration properties and defaults can be found here configuration.
The extension-confluent-kafka-client project is MIT licensed here.
Please consider license of dependencies, [confluent-kafka-dotnet] (https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/LICENSE) is not MIT licensed.