Copied and modifed from the legacy kafka-clj(https://github.com/kafka-dev/kafka/tree/master/clients/clojure)
This kafka-clj provides a producer and consumer that supports a basic fetch API as well as a managed sequence interface.
-
Better concurrency than Java API
-
Multifetch is not supported yet.
-
Consumer should be refactored
Download and start Kafka.
Pull dependencies with Leiningen:
$ lein deps
$ lein jar
(with-open [p (producer {:broker.list "0:localhost:9092"})]
(.produce p "topic1" "Message 1")
(.produce p "topic1" ["Message 2" "Message 3"])
(.produce p "topic1" "partition-key" ["Message 2" "Message 3"])
)
(with-open [p (producer {:zk.connect "localhost:2082"})]
(.produce p [ ["topic1" nil "Messages 1"]
["topic2" "partition-key" "Message 2"]]))
(with-open [p (producer {:zk.connect "localhost:2082"}]
(.produce p "test" "Partition-key1" "Message 1")
(.produce p "test" "Partition-key2" "Message 2")
Following options are supported:
- :broker.list string_ Comma seperated broker connection string. ::
- :zk.connect string ZooKeeper Connection String
- :zk.connectiontimeout.ms int ZooKeepr connection timeout Millis
- :zk.sessiontimeout.ms int ZooKeepr session timeout Millis
- :broker.type string [sync|async|batch] default : sync
- :partitioner function Partitioner function whchi accepts two arguments (partition-key num-partition). Default is random partition.
(with-open [c (consumer "localhost" 9092)]
(let [offs (offsets c "test" 0 -1 10)]
(.consume c "test" 0 (last offs) 1000000)))
(with-open [c (consumer {:broker.list "localhost:9092"})]
(doseq [m (.consume-seq c "test" 0 {:blocking true})]
(println m)))
Following options are supported:
- :blocking boolean default false, sequence returns nil the first time fetch does not return new messages. If set to true, the sequence tries to fetch new messages :repeat-count times every :repeat-timeout milliseconds.
- :repeat-count int number of attempts to fetch new messages before terminating, default 10.
- :repeat-timeout int wait time in milliseconds between fetch attempts, default 1000.
- :offset long initialized to highest offset if not provided.
- :max-size int max result message size, default 1000000.
Properties props = new Properties(); props.put("zk.connect", "localhost:2181"); props.put("multithread", "true"); kafka.types.Producer producer = kafka.kafka.newProducer(props);
List messages = new java.util.ArrayList(); messages.add("hello") messages.add("world") producer.produce("topic", messages);
producer.close();