A thin functional wrapper for Apache Kafka with Cats and FS2.
All operations are pure and resource safe. The API surface is very minimal, if you need more control you should probably use a different library.
Catska allows you to easily subscribe to a Kafka topic and get the stream of records:
import java.util.Properties
import java.util.UUID
import org.apache.kafka.common.serialization._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.amitayh.Catska
import cats.effect.IO
val topic = Catska.Topic("some-topic")
implicit val stringDeserializer: Deserializer[String] = new StringDeserializer
val props = new Properties
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
props.put(ConsumerConfig.GROUP_ID_CONFIG, "some-consumer-group")
// Get records as a stream of key-value tuples - `Stream[IO, (String, String)]`
val records = Catska.subscribe[IO, String, String](topic, props)
Creating a producer is also very simple
import ...
val topic = ...
val props = ...
// `Catska.producer` returns a producer as a `Resource[F, Producer[F, K, V]]`
Catska.producer[IO, String, String](topic, props).use { producer =>
producer.send("key1", "value1") *>
producer.send("key2", "value2")
}
Copyright © 2019 Amitay Horwitz
Distributed under the MIT License