Not able commit an offset or seek to an offset manually
Opened this issue · 3 comments
I am trying to commit offset manually using commit-async!
function from ketu.clients.consumer
namespace but I am getting this error.
Execution error (ConcurrentModificationException) at org.apache.kafka.clients.consumer.KafkaConsumer/acquire (KafkaConsumer.java:2421).
KafkaConsumer is not safe for multi-threaded access
Sample code
(defn consume-topic
"Consume message from Kafka"
[src-chan topic]
(let [source-opts {:name "topic-consumer"
:brokers "localhost:9092"
:topic topic
:group-id "group-1"
:key-type :string
:auto-offset-reset "earliest"
:value-type :string
:internal-config {"enable.auto.commit" false}
:shape [:map :key :value :topic :offset]}]
(source/source src-chan source-opts)))
(defn commit-offset
"Commit offset"
[consumer topic offset]
(c/commit-async! consumer
{(TopicPartition. topic 0)
(OffsetAndMetadata. offset)}
(c/commit-callback (fn [x e]
(prn "comitted" x)))))
(def channel (chan 100))
(def src (consume-topic channel "topic"))
(commit-offset (:ketu.source/consumer ksrc)
"topic1"
5)
I don't know what is wrong here. Any pointer would great. Thanks
Only one thread, whichever thread calls .poll()
, can acquire the lock on the KafkaConsumer
instance. That includes calling .seek()
We've had some clojure code wrapping the consumer instance for a while (there being no suitable wrapper libraries around at the time); seek is one of the things we had to get right, and this was the first thing that bit me when I wrote that.
I ended up using channels to interact with a loop in a clojure.core.async/thread
which wraps access to the consumer instance, and I pass it messages like [:seek "topic" partition offset]
for example.
Thanks, @blak3mill3r
(>!! (:ketu.source/consumer-thread src) [:seek "topic-1" 0 7])
I tried to execute the above line but nothing happens. Do I need to create a wrapper to interact with clojure.core.async/thread
or is it supported by ketu
?
Hi @ckshekhar
- A consumer control channel where you can put commit commands is in our roadmap, but currently not supported.
- Also as @blak3mill3r mentioned you can't safely commit while another thread is continuously polling in a loop, which is what ketu source does.
For now unfortunately you'll have to implement the polling loop yourself, where you'll have total control of committing offsets.