/kafka-clj

Kafka clojure client

Primary LanguageClojureApache License 2.0Apache-2.0

Announcement

Copied and modifed from the legacy kafka-clj(https://github.com/kafka-dev/kafka/tree/master/clients/clojure)

kafka-clj

This kafka-clj provides a producer and consumer that supports a basic fetch API as well as a managed sequence interface.

  • Better concurrency than Java API

  • Multifetch is not supported yet.

  • Consumer should be refactored

Quick Start

Download and start Kafka.

Pull dependencies with Leiningen:

$ lein deps
$ lein jar 

Usage

Sending messages

(with-open [p (producer {:broker.list "0:localhost:9092"})]
  (.produce p "topic1" "Message 1")
  (.produce p "topic1" ["Message 2" "Message 3"])
  (.produce p "topic1" "partition-key" ["Message 2" "Message 3"])
  )

Sending Multi messages

(with-open [p (producer {:zk.connect "localhost:2082"})]
  (.produce p [ ["topic1" nil "Messages 1"] 
                ["topic2" "partition-key" "Message 2"]]))

Message Partitioning

(with-open [p (producer {:zk.connect "localhost:2082"}]
  (.produce p "test" "Partition-key1" "Message 1")
  (.produce p "test" "Partition-key2" "Message 2")

Following options are supported:

  • :broker.list string_ Comma seperated broker connection string. ::
  • :zk.connect string ZooKeeper Connection String
  • :zk.connectiontimeout.ms int ZooKeepr connection timeout Millis
  • :zk.sessiontimeout.ms int ZooKeepr session timeout Millis
  • :broker.type string [sync|async|batch] default : sync
  • :partitioner function Partitioner function whchi accepts two arguments (partition-key num-partition). Default is random partition.

Simple consumer (will be deprecated)

(with-open [c (consumer "localhost" 9092)]
  (let [offs (offsets c "test" 0 -1 10)]
    (.consume c "test" 0 (last offs) 1000000)))

Consumer sequence (will be deprecated)

(with-open [c (consumer {:broker.list "localhost:9092"})]
  (doseq [m (.consume-seq c "test" 0 {:blocking true})]
    (println m)))

Following options are supported:

  • :blocking boolean default false, sequence returns nil the first time fetch does not return new messages. If set to true, the sequence tries to fetch new messages :repeat-count times every :repeat-timeout milliseconds.
  • :repeat-count int number of attempts to fetch new messages before terminating, default 10.
  • :repeat-timeout int wait time in milliseconds between fetch attempts, default 1000.
  • :offset long initialized to highest offset if not provided.
  • :max-size int max result message size, default 1000000.

Java Interop

Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("multithread", "true"); kafka.types.Producer producer = kafka.kafka.newProducer(props);

List messages = new java.util.ArrayList(); messages.add("hello") messages.add("world") producer.produce("topic", messages);

producer.close();