F# friendly wrapper for Confluent.Kafka
, with minimal dependencies or additional abstractions (but see related repos). Includes variant based on Confluent.Kafka
v 0.11.3
as a way to manage migration from 0.x
to 1.x
.
The components within this repository are delivered as a multi-targeted Nuget package targeting net461
(F# 3.1+) and netstandard2.0
(F# 4.5+) profiles
-
FsKafka
: WrapsConfluent.Kafka
to provide efficient batched Kafka Producer and Consumer configurations with basic logging instrumentation. Depends onConfluent.Kafka [1.5.3]
,librdkafka [1.5.3]
(pinned to ensure we use a tested pairing),Serilog
(but no specific Serilog sinks, i.e. you configure to emit toNLog
etc) andNewtonsoft.Json
(used internally to parse Broker-provided Statistics for logging purposes). -
FsKafka0
: As perFsKafka
; Depends onConfluent.Kafka [0.11.3]
,librdkafka [0.11.4]
,Serilog
andNewtonsoft.Json
.
- See the Propulsion repo for extended Producers and Consumers.
- See the Jet
dotnet new
templates repo'sproProjector
template (in-k
mode) for example producer logic using theBatchedProducer
and theproConsumer
template for examples of using theBatchedConsumer
fromFsKafka
, alongside the extended modes inPropulsion
. - See the Equinox QuickStart for examples of using this library to project to Kafka from
Equinox.Cosmos
and/orEquinox.EventStore
.
See CONTRIBUTING.md
The best place to start, sample-wise is from the dotnet new
templates stored in a dedicated repo.
The templates are the best way to see how to consume it; these instructions are intended mainly for people looking to make changes.
NB The tests are reliant on a TEST_KAFKA_BROKER
environment variable pointing to a Broker that has been configured to auto-create ephemeral Kafka Topics as required by the tests (each test run writes to a guid-named topic)
export TEST_KAFKA_BROKER="<server>:9092"
dotnet build build.proj -v n
-
The
BatchedConsumer
implementation tries to give clear feedback as to when reading is not keeping up, for diagnostic purposes. As of #32, such messages are tagged with the typeFsKafka.Core.InFlightMessageCounter
, and as such can be silenced by including the following in one'sLoggerConfiguration()
:.MinimumLevel.Override(FsKafka.Core.Constants.messageCounterSourceContext, Serilog.Events.LogEventLevel.Warning)
This code results from building out an end-to-end batteries-included set of libraries and templates as part of the Equinox project.
Equinox places some key constraints on all components and dependencies:-
- batteries-included examples of end-to-end functionality within the Equinox remit; samples should have clean consistent wiring
- pick a well-established base library, try not to add new concepts
- low dependencies, so it can work in lots of contexts without eggregiously forcing you to upgrade things
- aim to add any resilience features as patches to upstream repos
- thorough test coverage; integration coverage for core wrapped functionality, unit tests for any non-trivial logic in the wrapper library
You can see the development process history here. The base code originates from Jet's Inventory Management System (at the point it was taken, it worked against Confluent.Kafka
v 0.11.4
)
The v0
branch continues to house the original code as previously borne by the master
of this repo
- It will continue to address the need to provide an easy migration from the Kafunk API
- There are significant non-trivial changes in lifetime management in the
librdafka
drivers accompanying0.11.5
and0.11.6
(with potential behavioral changes implied too). While upgrading may be achievable without API changes, it does bring into play a series of changes related to how therdkafka
driver closes connections (which can result in long days chasingAccessViolationException
and friends) - NB Experience of the changes necessary to accommodate the sweeping changes that the
Confluent.Kafka
v1.0.0
API brings when compared to the0.11.x
codebase suggests it's likely to be a significant undertaking to adjust thev0
branch to targetConfluent.Kafka
v>= 1.0.0
without significant surface API changes (TL;DR there are no more events, you need to wire everything up in the Builder); there is absolutely no plan or resourcing to introduce such changes on thev0
branch; the suggested upgrade path is using the shims inPropulsion.Kafka0
in order to do an incremental switch-over to v1
Kafunk was an end-to-end F# implementation of a Kafka Client; it's no longer in active use in Jet.com
- the producer and consumer API wrappers provide different semantics to the
v0
branch. It's recommended to validate that they make sense for your use case - upgrading to a new version of
Confluent.Kafka
typically implies a knock on effect from an associated increment in the underlyingrdkafka
driver version (TODO - explain key behavior and perf changes between what1.0.0
implies vs0.11.4
) - you'll need to wire the (
Serilog
-based) logging through to your log sink (it's easy to connect it to an NLog Target etc). (Thev0
branch exposes a logging scheme which requires more direct integration) - there's an (internal) transitive dependency on
Newtonsoft.Json
v11.0.2
(which should generally not be a problem to accommodate in most codebases)
The single recommended way to move off a 0.x dependency that uses the older style API is to:
- retarget your code to use
FsKafka0
- release, test, validate
- retarget your code to use
FsKafka
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let batching = Batching.Linger (System.TimeSpan.FromMilliseconds 10.)
let producerConfig = KafkaProducerConfig.Create("MyClientId", "kafka:9092", Acks.All, batching)
let producer = KafkaProducer.Create(log, producerConfig, "MyTopic")
let key = Guid.NewGuid().ToString()
let deliveryResult = producer.ProduceAsync(key, "Hello World!") |> Async.RunSynchronously
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async {
for m in messages do
printfn "Received: %s" m.Message.Value
}
let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)
async {
use consumer = BatchedConsumer.Start(log, cfg, handler)
return! consumer.AwaitCompletion()
} |> Async.RunSynchronously
#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka
let log = Serilog.LoggerConfiguration().CreateLogger()
let handler (messages : ConsumeResult<string,string> []) = async {
for m in messages do
printfn "Received: %s" m.Message.Value
}
let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)
async {
use consumer = BatchedConsumer.Start(log, cfg, handler)
use _ = KafkaMonitor(log).Start(consumer.Inner, cfg.Inner.GroupId)
return! consumer.AwaitCompletion()
} |> Async.RunSynchronously