/FsKafka

Minimal F# wrappers for Confluent.Kafka+librdkafka.redist 0.x and 1.x

Primary LanguageF#Apache License 2.0Apache-2.0

FsKafka Build Status release NuGet license code size

F# friendly wrapper for Confluent.Kafka, with minimal dependencies or additional abstractions (but see related repos).

FsKafka wraps Confluent.Kafka to provide efficient batched Kafka Producer and Consumer configurations with basic logging instrumentation. Depends on Confluent.Kafka [1.6.2], librdkafka [1.6.1] (pinned to ensure we use a tested pairing), Serilog (but no specific Serilog sinks, i.e. you configure to emit to NLog etc) and Newtonsoft.Json (used internally to parse Broker-provided Statistics for logging purposes).

Usage

FsKafka is delivered as a Nuget package targeting netstandard2.0 and F# >= 4.5.

Install-Package FsKafka

or for paket, use:

paket add FsKafka

Related repos

  • See the Propulsion repo for extended Producers and Consumers.
  • See the Jet dotnet new templates repo's proProjector template (in -k mode) for example producer logic using the BatchedProducer and the proConsumer template for examples of using the BatchedConsumer from FsKafka, alongside the extended modes in Propulsion.
  • See the Equinox QuickStart for examples of using this library to project to Kafka from Equinox.Cosmos and/or Equinox.EventStore.

CONTRIBUTING

Contributions of all sizes are warmly welcomed. See our contribution guide

TEMPLATES

The best place to start, sample-wise is from the dotnet new templates stored in a dedicated repo.

BUILDING

The templates are the best way to see how to consume it; these instructions are intended mainly for people looking to make changes.

NB The tests are reliant on a TEST_KAFKA_BROKER environment variable pointing to a Broker that has been configured to auto-create ephemeral Kafka Topics as required by the tests (each test run writes to a guid-named topic)

build, including tests on netcoreapp3.1

export TEST_KAFKA_BROKER="<server>:9092"
dotnet build build.proj -v n

FAQ

How do I get rid of all the breaking off polling ... resuming polling spam?

  • The BatchedConsumer implementation tries to give clear feedback as to when reading is not keeping up, for diagnostic purposes. As of #32, such messages are tagged with the type FsKafka.Core.InFlightMessageCounter, and as such can be silenced by including the following in one's LoggerConfiguration():

    .MinimumLevel.Override(FsKafka.Core.Constants.messageCounterSourceContext, Serilog.Events.LogEventLevel.Warning)

What is this, why does it exist, where did it come from, is anyone using it ?

This code results from building out an end-to-end batteries-included set of libraries and templates as part of the Equinox project.

Equinox places some key constraints on all components and dependencies:-

  • batteries-included examples of end-to-end functionality within the Equinox remit; samples should have clean consistent wiring
  • pick a well-established base library, try not to add new concepts
  • low dependencies, so it can work in lots of contexts without egregiously forcing you to upgrade things
  • aim to add any resilience features as patches to upstream repos
  • thorough test coverage; integration coverage for core wrapped functionality, unit tests for any non-trivial logic in the wrapper library

Minimal producer example

#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka

let log = Serilog.LoggerConfiguration().CreateLogger()

let batching = Batching.Linger (System.TimeSpan.FromMilliseconds 10.)
let producerConfig = KafkaProducerConfig.Create("MyClientId", "kafka:9092", Acks.All, batching)
let producer = KafkaProducer.Create(log, producerConfig, "MyTopic")
   
let key = Guid.NewGuid().ToString()
let deliveryResult = producer.ProduceAsync(key, "Hello World!") |> Async.RunSynchronously

Minimal batched consumer example

#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka

let log = Serilog.LoggerConfiguration().CreateLogger()

let handler (messages : ConsumeResult<string,string> []) = async {
    for m in messages do
        printfn "Received: %s" m.Message.Value
} 

let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)

async {
    use consumer = BatchedConsumer.Start(log, cfg, handler)
    return! consumer.AwaitShutdown()
} |> Async.RunSynchronously

Minimal batched consumer example with monitor

#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka

let log = Serilog.LoggerConfiguration().CreateLogger()

let handler (messages : ConsumeResult<string,string> []) = async {
    for m in messages do
        printfn "Received: %s" m.Message.Value
} 

let cfg = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", ["MyTopic"], "MyGroupId", AutoOffsetReset.Earliest)

async {
    use consumer = BatchedConsumer.Start(log, cfg, handler)
    use _ = KafkaMonitor(log).Start(consumer.Inner, cfg.Inner.GroupId)
    return! consumer.AwaitShutdown()
} |> Async.RunSynchronously

Running (and awaiting) a pair of consumers until either throws

#r "nuget:FsKafka"
open Confluent.Kafka
open FsKafka

let log = Serilog.LoggerConfiguration().CreateLogger()

let handler (messages : ConsumeResult<string,string> []) = async {
    for m in messages do
        printfn "Received: %s" m.Message.Value
} 

let config topic = KafkaConsumerConfig.Create("MyClientId", "kafka:9092", [topic], "MyGroupId", AutoOffsetReset.Earliest)

let cfg1, cfg2 = config "MyTopicA", config "MyTopicB"

async {
    use consumer1 = BatchedConsumer.Start(log, cfg1, handler)
    use consumer2 = BatchedConsumer.Start(log, cfg2, handler)
    use _ = KafkaMonitor(log).Start(consumer1.Inner, cfg1.Inner.GroupId)
    use _ = KafkaMonitor(log).Start(consumer2.Inner, cfg2.Inner.GroupId)
    return! Async.Parallel [consumer1.AwaitWithStopOnCancellation(); consumer2.AwaitWithStopOnCancellation()]
} |> Async.RunSynchronously