AppsFlyer/ketu

Not able commit an offset or seek to an offset manually

Opened this issue · 3 comments

I am trying to commit offset manually using commit-async! function from ketu.clients.consumer namespace but I am getting this error.

Execution error (ConcurrentModificationException) at org.apache.kafka.clients.consumer.KafkaConsumer/acquire (KafkaConsumer.java:2421).
KafkaConsumer is not safe for multi-threaded access

Sample code

(defn consume-topic
  "Consume message from Kafka"
  [src-chan topic]
  (let [source-opts {:name "topic-consumer"
                     :brokers "localhost:9092"
                     :topic topic
                     :group-id "group-1"
                     :key-type :string
                     :auto-offset-reset "earliest"
                     :value-type :string
                     :internal-config {"enable.auto.commit" false}
                     :shape [:map :key :value :topic :offset]}]
    (source/source src-chan source-opts)))


(defn commit-offset
  "Commit offset"
  [consumer topic offset]
  (c/commit-async! consumer
                   {(TopicPartition. topic 0)
                    (OffsetAndMetadata. offset)}
                   (c/commit-callback (fn [x e]
                                        (prn "comitted" x)))))


(def channel (chan 100))

(def src (consume-topic channel "topic"))

(commit-offset (:ketu.source/consumer ksrc)
                          "topic1"
                          5)

I don't know what is wrong here. Any pointer would great. Thanks

Only one thread, whichever thread calls .poll(), can acquire the lock on the KafkaConsumer instance. That includes calling .seek()

We've had some clojure code wrapping the consumer instance for a while (there being no suitable wrapper libraries around at the time); seek is one of the things we had to get right, and this was the first thing that bit me when I wrote that.

I ended up using channels to interact with a loop in a clojure.core.async/thread which wraps access to the consumer instance, and I pass it messages like [:seek "topic" partition offset] for example.

Thanks, @blak3mill3r

(>!! (:ketu.source/consumer-thread src) [:seek "topic-1" 0 7])

I tried to execute the above line but nothing happens. Do I need to create a wrapper to interact with clojure.core.async/thread or is it supported by ketu?

Hi @ckshekhar

  • A consumer control channel where you can put commit commands is in our roadmap, but currently not supported.
  • Also as @blak3mill3r mentioned you can't safely commit while another thread is continuously polling in a loop, which is what ketu source does.

For now unfortunately you'll have to implement the polling loop yourself, where you'll have total control of committing offsets.