F# friendly wrapper for Confluent.Kafka
, with minimal dependencies or additional abstractions (but see related repos).
FsKafka
wraps Confluent.Kafka
to provide efficient batched Kafka Producer and Consumer configurations with basic logging instrumentation. Depends on Confluent.Kafka [1.6.2]
, librdkafka [1.6.1]
(pinned to ensure we use a tested pairing), Serilog
(but no specific Serilog sinks, i.e. you configure to emit to NLog
etc) and Newtonsoft.Json
(used internally to parse Broker-provided Statistics for logging purposes).
FsKafka is delivered as a Nuget package targeting netstandard2.0
and F# >= 4.5.
Install-Package FsKafka
or for paket
, use:
paket add FsKafka
- See the Propulsion repo for extended Producers and Consumers.
- See the Jet
dotnet new
templates repo'sproProjector
template (in-k
mode) for example producer logic using theBatchedProducer
and theproConsumer
template for examples of using theBatchedConsumer
fromFsKafka
, alongside the extended modes inPropulsion
. - See the Equinox QuickStart for examples of using this library to project to Kafka from
Equinox.Cosmos
and/orEquinox.EventStore
.
Contributions of all sizes are warmly welcomed. See our contribution guide
The best place to start, sample-wise is from the dotnet new
templates stored in a dedicated repo.
The templates are the best way to see how to consume it; these instructions are intended mainly for people looking to make changes.
NB The tests are reliant on a TEST_KAFKA_BROKER
environment variable pointing to a Broker that has been configured to auto-create ephemeral Kafka Topics as required by the tests (each test run writes to a guid-named topic)
export TEST_KAFKA_BROKER="<server>:9092"
dotnet build build.proj -v n
-
The
BatchedConsumer
implementation tries to give clear feedback as to when reading is not keeping up, for diagnostic purposes. As of #32, such messages are tagged with the typeFsKafka.Core.InFlightMessageCounter
, and as such can be silenced by including the following in one'sLoggerConfiguration()
:.MinimumLevel.Override(FsKafka.Core.Constants.messageCounterSourceContext, Serilog.Events.LogEventLevel.Warning)
This code results from building out an end-to-end batteries-included set of libraries and templates as part of the Equinox project.
Equinox places some key constraints on all components and dependencies:-
- batteries-included examples of end-to-end functionality within the Equinox remit; samples should have clean consistent wiring
- pick a well-established base library, try not to add new concepts
- low dependencies, so it can work in lots of contexts without egregiously forcing you to upgrade things
- aim to add any resilience features as patches to upstream repos
- thorough test coverage; integration coverage for core wrapped functionality, unit tests for any non-trivial logic in the wrapper library
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let batching = Batching.Linger (System.TimeSpan.FromMilliseconds 10.)
let producerConfig = KafkaProducerConfig.Create("MyClientId", "kafka:9092", Acks.All, batching)
let producer = KafkaProducer.Create(log, producerConfig, "MyTopic")
let key = Guid.NewGuid().ToString()
let deliveryResult = producer.ProduceAsync(key, "Hello World!") |> Async.RunSynchronously
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async {
for m in messages do
printfn "Received: %s" m.Message.Value
}
let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)
async {
use consumer = BatchedConsumer.Start(log, cfg, handler)
return! consumer.AwaitShutdown()
} |> Async.RunSynchronously
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async {
for m in messages do
printfn "Received: %s" m.Message.Value
}
let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)
async {
use consumer = BatchedConsumer.Start(log, cfg, handler)
use _ = KafkaMonitor(log).Start(consumer.Inner, cfg.Inner.GroupId)
return! consumer.AwaitShutdown()
} |> Async.RunSynchronously
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async {
for m in messages do
printfn "Received: %s" m.Message.Value
}
let config topic = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", [topic], "MyGroupId", AutoOffsetReset.Earliest)
let cfg1, cfg2 = config "MyTopicA", config "MyTopicB"
async {
use consumer1 = BatchedConsumer.Start(log, cfg1, handler)
use consumer2 = BatchedConsumer.Start(log, cfg2, handler)
use _ = KafkaMonitor(log).Start(consumer1.Inner, cfg1.Inner.GroupId)
use _ = KafkaMonitor(log).Start(consumer2.Inner, cfg2.Inner.GroupId)
return! Async.Parallel [consumer1.AwaitWithStopOnCancellation(); consumer2.AwaitWithStopOnCancellation()]
} |> Async.RunSynchronously