Time
acron0 opened this issue · 10 comments
I'm receiving an error whilst using kafka-clj for a producer. I can confirm the messages are being sent, as my console consumer picks them up. I'm actually using the KafkaClientService component but I've tried a more manual approach and the same thing occurs.
Created with:
(kafka/create-client-service
[{:host "localhost" :port 9092}]
{:flush-on-write true})
Sent using:
(kafka/send-msg (-> system :mq :client) "test" (.getBytes "hello"))
Error:
11:12:23.356 INFO k.client - Creating producer 172.17.0.4 9092
11:12:23.362 INFO k.produce - Creating client instance 172.17.0.4 : 9092
11:12:23.363 INFO k.produce - Creating client instance 172.17.0.4 : 9092
11:12:53.411 ERROR k.tcp - #error {
:cause Timeout while reading data from the input stream: got only 0 bytes of 4 last seen 1454497943366 diff 30031
:via
[{:type java.util.concurrent.TimeoutException
:message Timeout while reading data from the input stream: got only 0 bytes of 4 last seen 1454497943366 diff 30031
:at [kafka_clj.util.IOUtil readBytes IOUtil.java 57]}]
:trace
[[kafka_clj.util.IOUtil readBytes IOUtil.java 57]
[kafka_clj.util.IOUtil readInt IOUtil.java 22]
[kafka_clj.tcp$read_int invokePrim tcp.clj 56]
[kafka_clj.tcp$read_response invokePrim tcp.clj 68]
[kafka_clj.tcp$read_response invoke tcp.clj 66]
[kafka_clj.tcp$read_async_loop_BANG_$fn__51261$fn__51262 invoke tcp.clj 86]
[kafka_clj.tcp$read_async_loop_BANG_$fn__51261 invoke tcp.clj 85]
[clojure.core$binding_conveyor_fn$fn__4444 invoke core.clj 1916]
[clojure.lang.AFn call AFn.java 18]
[java.util.concurrent.FutureTask run FutureTask.java 262]
[java.util.concurrent.ThreadPoolExecutor runWorker ThreadPoolExecutor.java 1145]
[java.util.concurrent.ThreadPoolExecutor$Worker run ThreadPoolExecutor.java 615]
[java.lang.Thread run Thread.java 745]]}
java.util.concurrent.TimeoutException: Timeout while reading data from the input stream: got only 0 bytes of 4 last seen 1454497943366 diff 30031
at kafka_clj.util.IOUtil.readBytes(IOUtil.java:57) ~[kafka-clj-3.5.9.jar:na]
at kafka_clj.util.IOUtil.readInt(IOUtil.java:22) ~[kafka-clj-3.5.9.jar:na]
at kafka_clj.tcp$read_int.invokePrim(tcp.clj:56) ~[na:na]
at kafka_clj.tcp$read_response.invokePrim(tcp.clj:68) ~[na:na]
at kafka_clj.tcp$read_response.invoke(tcp.clj:66) ~[na:na]
at kafka_clj.tcp$read_async_loop_BANG_$fn__51261$fn__51262.invoke(tcp.clj:86) ~[na:na]
at kafka_clj.tcp$read_async_loop_BANG_$fn__51261.invoke(tcp.clj:85) [na:na]
at clojure.core$binding_conveyor_fn$fn__4444.invoke(core.clj:1916) [clojure-1.7.0.jar:na]
at clojure.lang.AFn.call(AFn.java:18) [clojure-1.7.0.jar:na]
at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_95]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_95]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_95]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_95]
I've found this issue with kafka sometimes, try using ack=0 see if it works, the issue definitely seems that kafka-fast is waiting for a response from kafka after sending, but timesout because no ack is sent.
Also I would note that I've found ack=1 to be too slow in most production cases (with any kafka client) so have always run with ack=0. Something I need to look into and try to improve, but it kafka doens't respond fast enough there isn't much to do really other than set ack=0.
also to explain how this works, if ack=1 the message send is still async so it will go through but the reading for response is done in a background thread and the message is kept on a temporary queue.
The timeout error recurs every 30 seconds - does that suggest that it's never receiving an ack? Also, this is a local, docker-ized Kafka so there really shouldn't be a reason it's failing. Are acks on the same port?
should be the same port, 30 seconds is kafka's default socket timeout, so the background thread in kafka-fast will retry to read a response and if it doesn't get any from the broker in 30seconds the socket timeout's out.
are you running one or 3 instances of kafka?
Just one.
Also, I changed the conf to this, but it didn't work either (same errors):
(kafka/create-client-service
[{:host host :port port}]
{:flush-on-write true
:acks 0})
Was this not what you meant?
yup, ok, strange this should ignore the response read thread, give me a sec and I'll trace this down.
ok the output your seeing is due to
(catch Exception e
;;only print out exceptions during debug
(debug "Timeout while reading response from producer broker " e)
(when (enabled? :debug)
(error e e)))
The exception error print out is only for debugging timeouts, but in ack=0 this shouldn't matter.
Here is why I try to read anyway (sorry my mind now remembers what I did then :) ) :
I've found the broker in some strange situations to send ack responses any how even with ack=0, don't know if it was a bug, but I decided to always read here, to keep the channel clean and read, to avoid lingering of open channels after closing.
I've removed the (error e e) print and left the debug only.
You don't need to specify ack:0 because that's the default, rechecked it :).
Please try the release: 3.6.0-SNAPSHOT and if it works for you I'll make a release 3.6.0
That seems to have done the trick, thanks
great!, thanks for reporting the issue.
I've made a release 3.6.0.