This is a tiny fs2 wrapper around the Kafka java client.
To get started with SBT, simply add the following lines to your build.sbt file.
resolvers += "Ovotech" at ""
libraryDependencies += "com.ovoenergy" %% "fs2-kafka-client" % "<latest version>"
To consume records without committing or with auto-commit:
import com.ovoenergy.fs2.kafka._
import scala.concurrent.duration._
import org.apache.kafka.common.serialization._
import org.apache.kafka.clients.consumer._
import cats.effect.IO
val settings = ConsumerSettings(
pollTimeout = 250.milliseconds,
maxParallelism = 4,
nativeSettings = Map(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "my-group-id",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
val consumedRecords = consume[IO](
new StringDeserializer,
new StringDeserializer,
To consume records, apply a function for each record and commit:
import com.ovoenergy.fs2.kafka._
import scala.concurrent.duration._
import org.apache.kafka.common.serialization._
import org.apache.kafka.clients.consumer._
import cats.effect.IO
import scala.concurrent.ExecutionContext
implicit val ec =
val settings = ConsumerSettings(
pollTimeout = 250.milliseconds,
maxParallelism = 4,
nativeSettings = Map(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "my-group-id",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
def processRecord(r: ConsumerRecord[String, String]): IO[Int] = IO {
// Apply some side-effect
val ints = consumeProcessAndCommit[IO](
new StringDeserializer,
new StringDeserializer,
The record processing order is guaranteed within the same partition, while records from different partitions are processed
in parallel up to the parallelism set into the ConsumerSettings
To consume batch of records, and process records with fs2.Pipe then commit:
import com.ovoenergy.fs2.kafka._
import scala.concurrent.duration._
import org.apache.kafka.common.serialization._
import org.apache.kafka.clients.consumer._
import cats.effect.IO
import fs2.Pipe
import scala.concurrent.ExecutionContext
implicit val ec =
val settings = ConsumerSettings(
pollTimeout = 250.milliseconds,
maxParallelism = 4,
nativeSettings = Map(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG -> "my-group-id",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
def pipe[K, V]: Pipe[IO, ConsumerRecord[K, V], Long] = {
_.evalMap(c => IO(c.offset()))
.filter(_ % 10 == 0)
.evalMap(o => IO(o * 3))
new StringDeserializer,
new StringDeserializer,
The producer is available as effectfull function:
import com.ovoenergy.fs2.kafka._
import scala.concurrent.duration._
import org.apache.kafka.common.serialization._
import org.apache.kafka.clients.producer._
import cats.effect.IO
import scala.concurrent.ExecutionContext
import java.util.Properties
implicit val ec =
val topic = "my-topic"
val key = "my-key"
val value = "my-value"
val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
val producer: Producer[String, String] = new KafkaProducer[String, String](props)
produceRecord[IO](producer, new ProducerRecord[String, String](topic, key, value))
The producer itself is available trough a Stream
import com.ovoenergy.fs2.kafka._
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.serialization._
import cats.effect.IO
import fs2._
val producerSettings = ProducerSettings(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:9092",
ProducerConfig.ACKS_CONFIG -> "all",
val topic = "my-topic"
new StringSerializer,
new StringSerializer
).flatMap { producer =>
val key = "my-key"
val value = "my-value"
Stream.eval {
produceRecord[IO](producer, new ProducerRecord[String, String](topic, key, value))