gerritjvv/kafka-fast

Time

acron0 opened this issue · 10 comments

I'm receiving an error whilst using kafka-clj for a producer. I can confirm the messages are being sent, as my console consumer picks them up. I'm actually using the KafkaClientService component but I've tried a more manual approach and the same thing occurs.

Created with:

(kafka/create-client-service
   [{:host "localhost" :port 9092}]
   {:flush-on-write true})

Sent using:

(kafka/send-msg (-> system :mq :client) "test" (.getBytes "hello"))

Error:

11:12:23.356 INFO  k.client  - Creating producer  172.17.0.4   9092
11:12:23.362 INFO  k.produce  - Creating client instance  172.17.0.4 : 9092
11:12:23.363 INFO  k.produce  - Creating client instance  172.17.0.4 : 9092
11:12:53.411 ERROR k.tcp  - #error {
 :cause Timeout while reading data from the input stream: got only 0 bytes of 4 last seen 1454497943366 diff 30031
 :via
 [{:type java.util.concurrent.TimeoutException
   :message Timeout while reading data from the input stream: got only 0 bytes of 4 last seen 1454497943366 diff 30031
   :at [kafka_clj.util.IOUtil readBytes IOUtil.java 57]}]
 :trace
 [[kafka_clj.util.IOUtil readBytes IOUtil.java 57]
  [kafka_clj.util.IOUtil readInt IOUtil.java 22]
  [kafka_clj.tcp$read_int invokePrim tcp.clj 56]
  [kafka_clj.tcp$read_response invokePrim tcp.clj 68]
  [kafka_clj.tcp$read_response invoke tcp.clj 66]
  [kafka_clj.tcp$read_async_loop_BANG_$fn__51261$fn__51262 invoke tcp.clj 86]
  [kafka_clj.tcp$read_async_loop_BANG_$fn__51261 invoke tcp.clj 85]
  [clojure.core$binding_conveyor_fn$fn__4444 invoke core.clj 1916]
  [clojure.lang.AFn call AFn.java 18]
  [java.util.concurrent.FutureTask run FutureTask.java 262]
  [java.util.concurrent.ThreadPoolExecutor runWorker ThreadPoolExecutor.java 1145]
  [java.util.concurrent.ThreadPoolExecutor$Worker run ThreadPoolExecutor.java 615]
  [java.lang.Thread run Thread.java 745]]}
java.util.concurrent.TimeoutException: Timeout while reading data from the input stream: got only 0 bytes of 4 last seen 1454497943366 diff 30031
    at kafka_clj.util.IOUtil.readBytes(IOUtil.java:57) ~[kafka-clj-3.5.9.jar:na]
    at kafka_clj.util.IOUtil.readInt(IOUtil.java:22) ~[kafka-clj-3.5.9.jar:na]
    at kafka_clj.tcp$read_int.invokePrim(tcp.clj:56) ~[na:na]
    at kafka_clj.tcp$read_response.invokePrim(tcp.clj:68) ~[na:na]
    at kafka_clj.tcp$read_response.invoke(tcp.clj:66) ~[na:na]
    at kafka_clj.tcp$read_async_loop_BANG_$fn__51261$fn__51262.invoke(tcp.clj:86) ~[na:na]
    at kafka_clj.tcp$read_async_loop_BANG_$fn__51261.invoke(tcp.clj:85) [na:na]
    at clojure.core$binding_conveyor_fn$fn__4444.invoke(core.clj:1916) [clojure-1.7.0.jar:na]
    at clojure.lang.AFn.call(AFn.java:18) [clojure-1.7.0.jar:na]
    at java.util.concurrent.FutureTask.run(FutureTask.java:262) [na:1.7.0_95]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_95]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_95]
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_95]

I've found this issue with kafka sometimes, try using ack=0 see if it works, the issue definitely seems that kafka-fast is waiting for a response from kafka after sending, but timesout because no ack is sent.

Also I would note that I've found ack=1 to be too slow in most production cases (with any kafka client) so have always run with ack=0. Something I need to look into and try to improve, but it kafka doens't respond fast enough there isn't much to do really other than set ack=0.

also to explain how this works, if ack=1 the message send is still async so it will go through but the reading for response is done in a background thread and the message is kept on a temporary queue.

The timeout error recurs every 30 seconds - does that suggest that it's never receiving an ack? Also, this is a local, docker-ized Kafka so there really shouldn't be a reason it's failing. Are acks on the same port?

should be the same port, 30 seconds is kafka's default socket timeout, so the background thread in kafka-fast will retry to read a response and if it doesn't get any from the broker in 30seconds the socket timeout's out.

are you running one or 3 instances of kafka?

Just one.

Also, I changed the conf to this, but it didn't work either (same errors):

(kafka/create-client-service
   [{:host host :port port}]
   {:flush-on-write true
    :acks 0})

Was this not what you meant?

yup, ok, strange this should ignore the response read thread, give me a sec and I'll trace this down.

ok the output your seeing is due to

(catch Exception e
            ;;only print out exceptions during debug
            (debug "Timeout while reading response from producer broker " e)
            (when (enabled? :debug)
              (error e e)))

The exception error print out is only for debugging timeouts, but in ack=0 this shouldn't matter.
Here is why I try to read anyway (sorry my mind now remembers what I did then :) ) :
I've found the broker in some strange situations to send ack responses any how even with ack=0, don't know if it was a bug, but I decided to always read here, to keep the channel clean and read, to avoid lingering of open channels after closing.

I've removed the (error e e) print and left the debug only.
You don't need to specify ack:0 because that's the default, rechecked it :).

Please try the release: 3.6.0-SNAPSHOT and if it works for you I'll make a release 3.6.0

That seems to have done the trick, thanks

great!, thanks for reporting the issue.

I've made a release 3.6.0.