/reactive-kafka

Reactive Streams API for Apache Kafka

Primary LanguageScalaApache License 2.0Apache-2.0

Reactive Streams for Kafka

Maven Central
If you have questions or are working on a pull request or just curious, please feel welcome to join the chat room: Join the chat at https://gitter.im/softwaremill/reactive-kafka

Reactive Streams wrapper for Apache Kafka.

Initiated by SoftwareMill

Supports Kafka 0.8.2.2

Available at Maven Central for Scala 2.10 and 2.11:

libraryDependencies += "com.softwaremill.reactivekafka" %% "reactive-kafka-core" % "0.8.3"

Example usage

Scala

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import kafka.serializer.{StringDecoder, StringEncoder}
import org.reactivestreams.{Publisher, Subscriber}
import com.softwaremill.react.kafka.{ReactiveKafka, ProducerProperties, ConsumerProperties}

implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorMaterializer()

val kafka = new ReactiveKafka()
val publisher: Publisher[StringKafkaMessage] = kafka.consume(ConsumerProperties(
  brokerList = "localhost:9092",
  zooKeeperHost = "localhost:2181",
  topic = "lowercaseStrings",
  groupId = "groupName",
  decoder = new StringDecoder()
))
val subscriber: Subscriber[String] = kafka.publish(ProducerProperties(
  brokerList = "localhost:9092",
  topic = "uppercaseStrings",
  encoder = new StringEncoder()
))

Source(publisher).map(_.message().toUpperCase).to(Sink(subscriber)).run()

Java

String zooKeeperHost = "localhost:2181";
String brokerList = "localhost:9092";

ReactiveKafka kafka = new ReactiveKafka();
ActorSystem system = ActorSystem.create("ReactiveKafka");
ActorMaterializer materializer = ActorMaterializer.create(system);

ConsumerProperties<String> cp =
   new PropertiesBuilder.Consumer(brokerList, zooKeeperHost, "topic", "groupId", new StringDecoder(null))
      .build();

Publisher<MessageAndMetadata<byte[], String>> publisher = kafka.consume(cp, system);

ProducerProperties<String> pp = new PropertiesBuilder.Producer(
   brokerList,
   zooKeeperHost,
   "topic",
   new StringEncoder(null)).build();

Subscriber<String> subscriber = kafka.publish(pp, system);

Source.from(publisher).map(msg -> msg.message()).to(Sink.create(subscriber)).run(materializer);

Passing configuration properties to Kafka

In order to set your own custom Kafka parameters, you can construct ConsumerProperties and ProducerProperties using some of their provided methods in a builder-pattern-style DSL, for example:

val consumerProperties = ConsumerProperties(
  "localhost:9092",
  "localhost:2181",
  "topic",
  "groupId",
  new StringDecoder()
)
  .consumerTimeoutMs(timeInMs = 100)
  .kafkaOffsetsStorage(dualCommit = true)
  .setProperty("some.kafka.property", "value") 

The ProducerProperties class offers a similar API.

Controlling consumer start offset

By default a new consumer will start reading from the beginning of a topic. If you want to start reading from the end, you can specify this on your ConsumerProperties:

  val consumerProperties = ConsumerProperties(...).readFromEndOfStream()

Working with actors

Since we are based upon akka-stream, the best way to handle errors is to leverage Akka's error handling and lifecycle management capabilities. Producers and consumers are in fact actors.

Obtaining actor references

ReactiveKafka comes with a few methods allowing working on the actor level. You can let it create Props to let your own supervisor create these actor as children, or you can directly create actors at the top level of supervision. Here are a some examples:

import akka.actor.{Props, ActorRef, Actor, ActorSystem}
import akka.stream.ActorMaterializer
import kafka.serializer.{StringEncoder, StringDecoder}
import com.softwaremill.react.kafka.{ReactiveKafka, ProducerProperties, ConsumerProperties}

// inside an Actor:
implicit val materializer = ActorMaterializer()

val kafka = new ReactiveKafka()
// publisher
val publisherProperties = ConsumerProperties(
  brokerList = "localhost:9092",
  zooKeeperHost = "localhost:2181",
  topic = "lowercaseStrings",
  groupId = "groupName",
  decoder = new StringDecoder()
)
val publisherActorProps: Props = kafka.consumerActorProps(publisherProperties)
val publisherActor: ActorRef = context.actorOf(publisherActorProps)
// or:
val topLevelPublisherActor: ActorRef = kafka.consumerActor(publisherProperties)

// subscriber
val subscriberProperties = ProducerProperties(
  brokerList = "localhost:9092",
  topic = "uppercaseStrings",
  encoder = new StringEncoder()
)
val subscriberActorProps: Props = kafka.producerActorProps(subscriberProperties)
val subscriberActor: ActorRef = context.actorOf(subscriberActorProps)
// or:
val topLevelSubscriberActor: ActorRef = kafka.producerActor(subscriberProperties)

Handling errors

When a publisher (consumer) fails to load more elements from Kafka, it calls onError() on all of its subscribers. The error will be handled depending on subscriber implementation.
When a subscriber (producer) fails to get more elements from upstream due to an error, it is no longer usable. It will throw an exception and close all underlying resource (effectively: the Kafka connection). If there's a problem with putting elements into Kafka, only an exception will be thrown. You can create the subscriber actor as a child of another actor and react to errors using supervision strategy. Example of custom error handling for a Kafka Sink:

import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import akka.stream.ActorMaterializer
import com.softwaremill.react.kafka.{ConsumerProperties, ProducerProperties, ReactiveKafka}

class Handler extends Actor {
  implicit val materializer = ActorMaterializer()

  override val supervisorStrategy: SupervisorStrategy = OneForOneStrategy() {
    case exception => Resume // Your custom error handling
  }

  def createSupervisedSubscriberActor() = {
    val kafka = new ReactiveKafka()

    // subscriber
    val subscriberProperties = ProducerProperties(
      brokerList = "localhost:9092",
      topic = "uppercaseStrings",
      encoder = new StringEncoder()
    )
    val subscriberActorProps: Props = kafka.producerActorProps(subscriberProperties)
    context.actorOf(subscriberActorProps)
  }
  // Rest of the Actor's body
}

Cleaning up

If you want to manually stop a publisher or a subscriber, you have to send an appropriate message to the underlying actor. KafkaActorPublisher must receive a ActorPublisherMessage.Cancel, where KafkaActorSubscriber must receive a ActorSubscriberMessage.OnComplete.
If you're using a PublisherWithCommitSink returned from ReactiveKafka.consumeWithOffsetSink(), you must call its cancel() method in order to gracefully close all underlying resources.

Manual Commit (version 0.8 and above)

In order to be able to achieve "at-least-once" delivery, you can use following API to obtain an additional Sink, when you can stream back messages that you processed. An underlying actor will periodically flush offsets of these messages as committed. Reactive Kafka supports manual commit both to Zookeeper (legacy) and Kafka storage. Dual commit is not supported. In order to commit manually to zookeeper, you have to add an optional module to your dependencies:

libraryDependencies += "com.softwaremill.reactivekafka" %% "zookeeper-committer" % reactiveKafkaVersion

Example of a consumer with manual commit for processed messages:

import scala.concurrent.duration._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import com.softwaremill.react.kafka.{ConsumerProperties, ReactiveKafka}

implicit val actorSystem = ActorSystem("ReactiveKafka")
implicit val materializer = ActorMaterializer()

val kafka = new ReactiveKafka()
val consumerProperties = ConsumerProperties(
  brokerList = "localhost:9092",
  zooKeeperHost = "localhost:2181",
  topic = "lowercaseStrings",
  groupId = "groupName",
  decoder = new StringDecoder())
.commitInterval(5 seconds) // flush interval
    
val consumerWithOffsetSink = kafka.consumeWithOffsetSink(consumerProperties)
Source(consumerWithOffsetSink.publisher)
  .map(processMessage(_)) // your message processing
  .to(consumerWithOffsetSink.offsetCommitSink) // stream back for commit
  .run()

Tuning

KafkaActorSubscriber and KafkaActorPublisher have their own thread pools, configured in reference.conf. You can tune them by overriding kafka-publisher-dispatcher.thread-pool-executor and kafka-subscriber-dispatcher.thread-pool-executor in your application.conf file.
Alternatively you can provide your own dispatcher name. It can be passed to appropriate variants of factory methods in ReactiveKafka: publish(), producerActor(), producerActorProps() or consume(), consumerActor(), consumerActorProps().

Testing

Tests require Apache Kafka and Zookeeper to be available on localhost:9092 and localhost:2181