
Clojure client for Apache Kafka

A Clojure library for the Apache Kafka (distributed stream-processing software platform).

Uses KF protocol and does not rely on ZooKeeper.

v0.4.0 version is well tested with Kafka v2.2.x and 2.3.0 with native drivers on Scala 2.11 and 2.12. Still contains zookeeper as a dependency.

Tries to be as lightweigh as possible thus depends only on

  • org.apache.kafka/kafka_2.12 "2.3.0"

  • org.apache.kafka/kafka-clients "2.3.0"

  • org.apache.zookeeper/zookeeper "3.5.5"

but excluding jms,jmx* and logging.

v0.4.0 may be partially (or even fully!) incompatible with some versions of other libraries that use NIO! If you’re experiencing build problems and/or your application crashed on start - try to update your dependencies by adding [io.netty/netty-all "4.1.45.Final"] or newer

Current version is:

Add the following to your Leiningen’s project.clj:

[net.tbt-post/clj-kafka-x "0.4.0"]

or with explicit NIO

[net.tbt-post/clj-kafka-x "0.4.0"]
[io.netty/netty-all "4.1.45.Final"]



(require '[clj-kafka-x.producer :as kp])

(with-open [p (kp/producer {"bootstrap.servers" "localhost:9092"}
  @(kp/send p (kp/record "topic-a" "Hi there!")))


(require '[clj-kafka-x.consumers.simple :as kc])

(with-open [c (kc/consumer {"bootstrap.servers" "localhost:9092"
                            "group.id" "consumer-id"}
  (kc/subscribe c "topic-a")
  (kc/messages c))
When you use multiple partitions per topic it is required to specify them explicitly when subscribing, i.e. (kc/subscribe c [{:topic "topic-a" :partitions #{0 1}} {:topic "topic-b" :partitions #{0 1 2}}])
Real-life (almost) example
(ns buzz.consumer.kafka
  (:require [clj-kafka-x.consumers.simple :as kc]
            [clojure.tools.logging :as log]))

(defn processor [msg schema] msg)
(def schema nil)
(def config {"bootstrap.servers" "localhost:9092"
             "group.id" "consumer-id"})

(defn process-message [msg]
  (let [{:keys [value topic partition offset]} msg
        processor processor ;; choose one by topic name
        schema schema]      ;; choose one by topic name
    (if (fn? processor) (processor value schema) value)))

(defn consume []
  (with-open [c (kc/consumer config
    (kc/subscribe c (config/kafka-topics))
    (let [pool (kc/messages c)]
      (doseq [message pool]
        (log/warn (process-message message))))))

you may also use specific timeouts form

(defn- consume [instance process-message]
  (when-let [co (kc/consumer config
             messages (kc/messages
                        :timeout (:request-timeout-ms config))]
    (doall (map process-message messages))))

message count per poll execution may be specified by max.poll.records field of configuration

Manual Build

$ lein install


Copyright © 2016-2020

Distributed under the Apache License v 2.0