A Clojure library for the Apache Kafka (distributed stream-processing software platform).
Uses KF protocol and does not rely on ZooKeeper.
Note
|
v0.4.0 version is well tested with Kafka v2.2.x and 2.3.0 with native drivers on Scala 2.11 and 2.12. Still contains zookeeper as a dependency. |
Tries to be as lightweigh as possible thus depends only on
-
org.apache.kafka/kafka_2.12 "2.3.0"
-
org.apache.kafka/kafka-clients "2.3.0"
-
org.apache.zookeeper/zookeeper "3.5.5"
but excluding jms
,jmx*
and logging.
Caution
|
v0.4.0 may be partially (or even fully!) incompatible with some versions of other libraries that use NIO! If you’re experiencing build problems and/or your application crashed on start - try to update your dependencies by adding [io.netty/netty-all "4.1.45.Final"] or newer
|
Current version is:
Add the following to your Leiningen’s
project.clj
:
[net.tbt-post/clj-kafka-x "0.4.0"]
or with explicit NIO
[net.tbt-post/clj-kafka-x "0.4.0"]
[io.netty/netty-all "4.1.45.Final"]
(require '[clj-kafka-x.producer :as kp])
(with-open [p (kp/producer {"bootstrap.servers" "localhost:9092"}
(kp/string-serializer)
(kp/string-serializer))]
@(kp/send p (kp/record "topic-a" "Hi there!")))
(require '[clj-kafka-x.consumers.simple :as kc])
(with-open [c (kc/consumer {"bootstrap.servers" "localhost:9092"
"group.id" "consumer-id"}
(kc/string-deserializer)
(kc/string-deserializer))]
(kc/subscribe c "topic-a")
(kc/messages c))
Note
|
When you use multiple partitions per topic it is required
to specify them explicitly when subscribing, i.e.
(kc/subscribe
c [{:topic "topic-a" :partitions #{0 1}}
{:topic "topic-b" :partitions #{0 1 2}}])
|
Real-life (almost) example
(ns buzz.consumer.kafka
(:require [clj-kafka-x.consumers.simple :as kc]
[clojure.tools.logging :as log]))
(defn processor [msg schema] msg)
(def schema nil)
(def config {"bootstrap.servers" "localhost:9092"
"group.id" "consumer-id"})
(defn process-message [msg]
(let [{:keys [value topic partition offset]} msg
processor processor ;; choose one by topic name
schema schema] ;; choose one by topic name
(if (fn? processor) (processor value schema) value)))
(defn consume []
(with-open [c (kc/consumer config
(kc/byte-array-deserializer)
(kc/byte-array-deserializer))]
(kc/subscribe c (config/kafka-topics))
(let [pool (kc/messages c)]
(doseq [message pool]
(log/warn (process-message message))))))
you may also use specific timeouts form
(defn- consume [instance process-message]
(when-let [co (kc/consumer config
(kc/byte-array-deserializer)
(kc/byte-array-deserializer))
messages (kc/messages
co
:timeout (:request-timeout-ms config))]
(doall (map process-message messages))))
message count per poll execution may be specified by max.poll.records
field of configuration
Copyright © 2016-2020
Distributed under the Apache License v 2.0