Gregor
Lightweight Clojure wrapper for Apache Kafka Consumer + Producer APIs.
:dependencies [[org.clojure/clojure "1.10.0"]
[org.apache.kafka/kafka_2.12 "2.1.1"]]
Gregor wraps most of the Java API for the Kafka Producer and New Consumer The goal of this project is to stay very close to the Kafka API instead of adding more advanced features.
Example
Here's an example of at-least-once processing (using mount
):
(ns gregor-sample-app.core
(:gen-class)
(:require [clojure.repl :as repl]
[gregor.core :as gregor]
[mount.core :as mount :refer [defstate]]))
(def run (atom true))
(defstate consumer
:start (gregor/consumer "localhost:9092"
"testgroup"
["test-topic"]
{"auto.offset.reset" "earliest"
"enable.auto.commit" "false"})
:stop (gregor/close consumer))
(defstate producer
:start (gregor/producer "localhost:9092")
:stop (gregor/close producer))
(defn -main
[& args]
(mount/start)
(repl/set-break-handler! (fn [sig] (reset! run false)))
(while @run
(let [consumer-records (gregor/poll consumer)
values (process-records consumer-records)]
(doseq [v values]
(gregor/send producer "other-topic" v))
(gregor/commit-offsets! consumer)))
(mount/stop))
Transformations over consumer records are applied in process-records
. Each record in
the seq
returned by poll
is a map. Here's an example with a JSON object as the
:value
:
{:value "{\"foo\":42}"
:key nil
:partition 0
:topic "test-topic"
:offset 939}
Producing
Gregor provides the send
function for asynchronously sending a record to a topic. There
are multiple arities which correspond to those of the ProducerRecord
Java constructor. If
you'd like to provide a callback to be invoked when the send has been acknowledged use
send-then
instead.
Topic Management
Create an admin client
(def admin-client (admin "localhost:9092"))
Create a topic:
(create-topic admin-client "some-topic" {})
That empty map can be used to specify configuration for number of topic partitions, replication factor,
Delete a topic:
(delete-topic admin-client "some-topic")
Query about a topic's existence:
(topic-exists? admin-client "some-topic")
List existing topics:
(topics admin-client)
License
Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.