/fs2-kafka-client

Simple Kafka Client for fs2

Primary LanguageScalaApache License 2.0Apache-2.0

Simple Kafka client for functional streams for scala

This is a tiny fs2 wrapper around the Kafka java client.

CircleCI Codacy Badge Download

Installation

To get started with SBT, simply add the following lines to your build.sbt file.

resolvers += "Ovotech" at "https://dl.bintray.com/ovotech/maven"

libraryDependencies += "com.ovoenergy" %% "fs2-kafka-client" % "<latest version>"

Consuming

To consume records without committing or with auto-commit:

import com.ovoenergy.fs2.kafka._
import scala.concurrent.duration._
import org.apache.kafka.common.serialization._
import org.apache.kafka.clients.consumer._
import cats.effect.IO

val settings = ConsumerSettings(
    pollTimeout = 250.milliseconds,
    maxParallelism = 4,
    nativeSettings = Map(
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "my-group-id",
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )
)

val consumedRecords = consume[IO](
  TopicSubscription(Set("my-topic")),
  new StringDeserializer,
  new StringDeserializer,
  settings
).take(1000).compile.toVector

To consume records, apply a function for each record and commit:

import com.ovoenergy.fs2.kafka._
import scala.concurrent.duration._
import org.apache.kafka.common.serialization._
import org.apache.kafka.clients.consumer._
import cats.effect.IO
import scala.concurrent.ExecutionContext

implicit val ec = ExecutionContext.global

val settings = ConsumerSettings(
    pollTimeout = 250.milliseconds,
    maxParallelism = 4,
    nativeSettings = Map(
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "my-group-id",
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )
)

def processRecord(r: ConsumerRecord[String, String]): IO[Int] = IO {
  // Apply some side-effect
  1
}

val ints = consumeProcessAndCommit[IO](
    TopicSubscription(Set("my-topic")),
    new StringDeserializer,
    new StringDeserializer,
    settings
)(processRecord).take(1000).compile.toVector

The record processing order is guaranteed within the same partition, while records from different partitions are processed in parallel up to the parallelism set into the ConsumerSettings.

To consume batch of records, and process records with fs2.Pipe then commit:

import com.ovoenergy.fs2.kafka._
import scala.concurrent.duration._
import org.apache.kafka.common.serialization._
import org.apache.kafka.clients.consumer._
import cats.effect.IO
import fs2.Pipe
import scala.concurrent.ExecutionContext

implicit val ec = ExecutionContext.global

val settings = ConsumerSettings(
    pollTimeout = 250.milliseconds,
    maxParallelism = 4,
    nativeSettings = Map(
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> "false",
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "my-group-id",
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest"
    )
)

def pipe[K, V]: Pipe[IO, ConsumerRecord[K, V], Long] = {
  _.evalMap(c => IO(c.offset()))
    .filter(_ % 10 == 0)
    .evalMap(o => IO(o * 3))
}

consumeProcessBatchWithPipeAndCommit[IO](
    TopicSubscription(Set("my-topic")),
    new StringDeserializer,
    new StringDeserializer,
    settings
)(pipe).take(1).compile.drain

Producing

The producer is available as effectfull function:

import com.ovoenergy.fs2.kafka._
import scala.concurrent.duration._
import org.apache.kafka.common.serialization._
import org.apache.kafka.clients.producer._
import cats.effect.IO
import scala.concurrent.ExecutionContext
import java.util.Properties

implicit val ec = ExecutionContext.global

val topic = "my-topic"
val key = "my-key"
val value = "my-value"

val props = new Properties()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ProducerConfig.CLIENT_ID_CONFIG, "KafkaExampleProducer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)

val producer: Producer[String, String] = new KafkaProducer[String, String](props)

produceRecord[IO](producer, new ProducerRecord[String, String](topic, key, value))

The producer itself is available trough a Stream:

import com.ovoenergy.fs2.kafka._
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.serialization._
import cats.effect.IO
import fs2._

val producerSettings = ProducerSettings(
  Map(
    ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:9092",
    ProducerConfig.ACKS_CONFIG -> "all",
    ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION -> "1"
  )
)

val topic = "my-topic"

producerStream[IO](
  producerSettings,
  new StringSerializer,
  new StringSerializer
).flatMap { producer =>

  val key = "my-key"
  val value = "my-value"

  Stream.eval {
    produceRecord[IO](producer, new ProducerRecord[String, String](topic, key, value))
  }
}