pingles/clj-kafka

I cannot get messages using clj-kafka.consumer.zk

Closed this issue · 2 comments

I tried to get kafka messages using clj-kafka.consumer.zk but I cannot do it yet.
The producer code is as follows:

(ns chapter04.kafka
  (:require [clj-kafka.producer :as p]
            [clj-kafka.core :as core]
            [clj-kafka.consumer.zk :as zk-consumer]
            )
  (:import (kafka.consumer Consumer ConsumerConfig KafkaStream)
           (kafka.producer KeyedMessage ProducerConfig)
           (kafka.javaapi.producer Producer)
           (java.util Properties)
           (java.util.concurrent Executors))
  )

(def p (p/producer {"metadata.broker.list" "localhost:9092"
                  "serializer.class" "kafka.serializer.DefaultEncoder"
                  "partitioner.class" "kafka.producer.DefaultPartitioner"}))

(p/send-message p (p/message "test" (.getBytes "this is my message")))
(p/send-message p (p/message "test" (.getBytes "this is my message")))

The below is the consumer code:

(def consumer-config {"zookeeper.connect" "localhost:2181"
             "group.id" "clj-kafka.consumer"
             "auto.offset.reset" "smallest"
             "auto.commit.enable" "false"})

(core/with-resource [c (zk-consumer/consumer consumer-config)]
  zk-consumer/shutdown
  (take 2 (zk-consumer/messages c "test"))
  )

The last consumer expression returns (). I am expecting the result is ("this is my message" "this is my message").

Using kafka bundled kafka-console-consumer.sh can retrieve messages correctly.

$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
this is my message
this is my message

I am using kafka_2.10-0.8.2.1. Is there something wrong in my environment ? Or is there any way to solve this problem ?

I think the docs are just sucky here.

(core/with-resource [c (zk-consumer/consumer consumer-config)]
  zk-consumer/shutdown
  (take 2 (zk-consumer/messages c "test")))

What happens is that this code will open a consumer, create a lazy seq of it's messages, and then immediately close the consumer and return the seq.

However since the seq is lazy, once you start reading, the consumer has already been closed and thus has no new messages for it.

To return the actual messages you have to force the seq's evaluation within the with-resource.

(core/with-resource [c (zk-consumer/consumer consumer-config)]
  zk-consumer/shutdown
  (doall (take 2 (zk-consumer/messages c "test"))))

@j-pb Oh great ! Thank you !
It worked as follows:

(def x
     (core/with-resource [c (zk/consumer config)]
       zk/shutdown
       (doall (take 1 (zk/messages c "test"))))
  )
;; #'clj-kafka-example.core/x
(realized? x)
;; true
x
;;(#clj_kafka.core.KafkaMessage{:topic "test", :offset 0, :partition 0, :key nil, :value #object["[B" 0x6bbcc481 "[B@6bbcc481"]})

(def x
     (core/with-resource [c (zk/consumer config)]
       zk/shutdown
       (take 1 (zk/messages c "test"))
       )
  )
;; #'clj-kafka-example.core/x
(realized? x)
;; false
x
;;()

I have another question. When I called a consumer 3 times, it returned the same message.
I would like to know how to get different consequent messages.

(core/with-resource [c (zk/consumer config)]
  zk/shutdown
  (doall (take 1 (zk/messages c "test"))))
;; (#clj_kafka.core.KafkaMessage{:topic "test", :offset 0, :partition 0, :key nil, :value #object["[B" 0x6d1531d "[B@6d1531d"]})
(core/with-resource [c (zk/consumer config)]
  zk/shutdown
  (doall (take 1 (zk/messages c "test"))))
;; (#clj_kafka.core.KafkaMessage{:topic "test", :offset 0, :partition 0, :key nil, :value #object["[B" 0x2cdb87c2 "[B@2cdb87c2"]})
(core/with-resource [c (zk/consumer config)]
  zk/shutdown
  (doall (take 1 (zk/messages c "test"))))
;; (#clj_kafka.core.KafkaMessage{:topic "test", :offset 0, :partition 0, :key nil, :value #object["[B" 0x1b90d67c "[B@1b90d67c"]})

Another problem is it worked well to take messages less than messages in a topic, however it hanged
to try to take messages more than messages in a topic.

Assume there are 10 messages and it worked to take 3 messages from the topic but it hanged to try to
take 11 messages.

(core/with-resource [c (zk/consumer config)]
  zk/shutdown
  (doall (take 3 (zk/messages c "test"))))
;; (#clj_kafka.core.KafkaMessage{:topic "test", :offset 0, :partition 0, :key nil, :value #object["[B" 0x45df4b8d "[B@45df4b8d"]}
;; #clj_kafka.core.KafkaMessage{:topic "test", :offset 1, :partition 0, :key nil, :value #object["[B" 0x645c0a87 "[B@645c0a87"]}
;; #clj_kafka.core.KafkaMessage{:topic "test", :offset 2, :partition 0, :key nil, :value #object["[B" 0x4b45519a "[B@4b45519a"]})
(core/with-resource [c (zk/consumer config)]
  zk/shutdown
  (doall (take 11 (zk/messages c "test"))))

The above case, it seemed waiting for a next message but it didn't exit after produce a new message on the topic(looked hanged).

Thanks,
MH