Monix integration with Kafka
Work in progress!
- Getting Started with Kafka 1.0.x or above
- Getting Started with Kafka 0.11.x
- Getting Started with Kafka 0.10.x
- Getting Started with Kafka 0.9.x
- Getting Started with Kafka 0.8.x (no longer supported)
- Usage
- How can I contribute to Monix-Kafka?
- Maintainers
- License
In SBT:
libraryDependencies += "io.monix" %% "monix-kafka-1x" % "1.0.0-RC5"
For kafka
versions higher than 1.0.x
also add a dependency override:
dependencyOverrides += "org.apache.kafka" % "kafka" % "2.1.0"
Or in case you're interested in running the tests of this project, it now supports embedded kafka for integration testing. You can simply run:
sbt kafka1x/test
In SBT:
libraryDependencies += "io.monix" %% "monix-kafka-11" % "1.0.0-RC5"
Or in case you're interested in running the tests of this project, it now supports embedded kafka for integration testing. You can simply run:
sbt kafka11/test
In SBT:
libraryDependencies += "io.monix" %% "monix-kafka-10" % "1.0.0-RC5"
Or in case you're interested in running the tests of this project, it now supports embedded kafka for integration testing. You can simply run:
sbt kafka10/test
Please note that EmbeddedKafka
is not supported for Kafka 0.9.x
In SBT:
libraryDependencies += "io.monix" %% "monix-kafka-9" % "1.0.0-RC5"
Or in case you're interested in running the tests of this project,
first download the Kafka server, version 0.9.x
from their
download page (note that
0.10.x
or higher do not work with 0.9
), then as the
quick start
section says, open a terminal window and first start Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
Then start Kafka:
bin/kafka-server-start.sh config/server.properties
Create the topic we need for our tests:
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic monix-kafka-tests
And run the tests:
sbt kafka9/test
Please note that support for Kafka 0.8.x
is dropped and the last available version with this dependency is 0.14
.
In SBT:
libraryDependencies += "io.monix" %% "monix-kafka-8" % "0.14"
Or in case you're interested in running the tests of this project,
first download the Kafka server, version 0.8.x
from their
download page (note that
0.9.x
or higher do not work with 0.8
), then as the
quick start
section says, open a terminal window and first start Zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
Then start Kafka:
bin/kafka-server-start.sh config/server.properties
Create the topics we need for our tests:
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic monix-kafka-tests
bin/kafka-topics.sh --create --zookeeper localhost:2181 \
--replication-factor 1 --partitions 1 \
--topic monix-kafka-manual-commit-tests
And run the tests:
sbt kafka8/test
import monix.kafka._
import monix.execution.Scheduler
implicit val scheduler: Scheduler = monix.execution.Scheduler.global
// Init
val producerCfg = KafkaProducerConfig.default.copy(
bootstrapServers = List("127.0.0.1:9092")
)
val producer = KafkaProducer[String,String](producerCfg, scheduler)
// For sending one message
val recordMetadataF = producer.send("my-topic", "my-message").runToFuture
// For closing the producer connection
val closeF = producer.close().runToFuture
Calling producer.send
returns a Task of Option[RecordMetadata]
which can then be run and transformed into a Future
.
If the Task
completes with None
it means that producer.send
method was called after the producer was closed and that the message wasn't successfully acknowledged by the Kafka broker. In case of the failure of the underlying Kafka client the producer will bubble up the exception and fail the Task
. All successfully delivered messages will complete with Some[RecordMetadata]
.
For pushing an entire Observable
to Apache Kafka:
import monix.kafka._
import monix.execution.Scheduler
import monix.reactive.Observable
import org.apache.kafka.clients.producer.ProducerRecord
implicit val scheduler: Scheduler = monix.execution.Scheduler.global
// Initializing the producer
val producerCfg = KafkaProducerConfig.default.copy(
bootstrapServers = List("127.0.0.1:9092")
)
val producer = KafkaProducerSink[String,String](producerCfg, scheduler)
// Lets pretend we have this observable of records
val observable: Observable[ProducerRecord[String,String]] = ???
observable
// on overflow, start dropping incoming events
.whileBusyDrop
// buffers into batches if the consumer is busy, up to a max size
.bufferIntrospective(1024)
// consume everything by pushing into Apache Kafka
.consumeWith(producer)
// ready, set, go!
.runToFuture
There are several ways for consuming from Apache Kafka (Version 0.11.x and above):
Consumer which commits offsets itself:
import monix.kafka._
val consumerCfg = KafkaConsumerConfig.default.copy(
bootstrapServers = List("127.0.0.1:9092"),
groupId = "kafka-tests"
// you can use this settings for At Most Once semantics:
// observableCommitOrder = ObservableCommitOrder.BeforeAck
)
val observable =
KafkaConsumerObservable[String,String](consumerCfg, List("my-topic"))
.take(10000)
.map(_.value())
Consumer which allows you to commit offsets manually:
import monix.kafka._
val consumerCfg = KafkaConsumerConfig.default.copy(
bootstrapServers = List("127.0.0.1:9092"),
groupId = "kafka-tests"
)
val observable =
KafkaConsumerObservable.manualCommit[String,String](consumerCfg, List("my-topic"))
.map(message => message.record.value() -> message.committableOffset)
.mapEval { case (value, offset) => performBusinessLogic(value).map(_ => offset) }
.bufferTimedAndCounted(1.second, 1000)
.mapEval(offsets => CommittableOffsetBatch(offsets).commitSync())
Enjoy!
Issue#101
Starting from Kafka 0.10.1.0, there is max.poll.interval.ms
setting:
The maximum delay between invocations of poll() when using consumer group management.
This places an upper bound on the amount of time that the consumer can be idle before
fetching more records. If poll() is not called before expiration of this timeout,
then the consumer is considered failed and the group will rebalance in order
to reassign the partitions to another member.
Since, monix-kafka backpressures until all records has been processed.
This could be a problem if processing takes time.
You can reduce max.poll.records
if you are experiencing this issue.
We welcome contributions to all projects in the Monix organization and would love for you to help build Monix-Kafka. See our contributor guide for more information about how you can get involed.
The current maintainers (people who can merge pull requests) are:
- Alexandru Nedelcu (alexandru)
- Alex Gryzlov (clayrat)
- Piotr Gawryś (Avasil)
- Leandro Bolivar (leandrob13)
All code in this repository is licensed under the Apache License, Version 2.0. See LICENSE.txt.