gerritjvv/kafka-fast

After a period of no data to send, it automatically disconnected, the exception of this anomaly

Closed this issue · 13 comments

(def c (atom (create-connector [{:host "localhost" :port 9999}]
                               {:flush-on-write true :retry-cache-file
                                                (clojure.java.io/file (str (System/getProperty "java.io.tmpdir") "/kafka-retry-cache"))})))
(defn send-kafka
  ""
  [topic data]
  (try
    (send-msg @c topic (.getBytes (str data)))
    (catch Exception e
      (prn :出现异常进行重试)
      (.printStackTrace e)
      (reset! c (create-connector [{:host "localhost" :port 9090}]
                                  {:flush-on-write true :retry-cache-file
                                                   (clojure.java.io/file (str (System/getProperty "java.io.tmpdir") "/kafka-retry-cache"))}))
      (send-msg @c topic (.getBytes (str data)))
      ))
  )

After a period of no data to send, it automatically disconnected, the exception of this anomaly

java.net.SocketException: Broken pipe (Write failed)
        at java.net.SocketOutputStream.socketWrite0(Native Method)
        at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
        at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
        at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
        at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
        at io.netty.buffer.UnpooledHeapByteBuf.getBytes(UnpooledHeapByteBuf.java:190)
        at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:751)
        at kafka_clj.tcp$eval20193$fn__20194.invoke(tcp.clj:200)
        at kafka_clj.tcp_api$eval19956$fn__19957$G__19947__19964.invoke(tcp_api.clj:7)
        at kafka_clj.tcp_api$write_BANG_.invokeStatic(tcp_api.clj:12)
        at kafka_clj.tcp_api$write_BANG_.doInvoke(tcp_api.clj:10)
        at clojure.lang.RestFn.invoke(RestFn.java:464)
        at kafka_clj.produce$send_messages.invokeStatic(produce.clj:189)
        at kafka_clj.produce$send_messages.invoke(produce.clj:172)
        at kafka_clj.produce$send_messages.invokeStatic(produce.clj:178)
        at kafka_clj.produce$send_messages.invoke(produce.clj:172)
        at kafka_clj.client$send_data.invokeStatic(client.clj:274)
        at kafka_clj.client$send_data.invoke(client.clj:267)
        at kafka_clj.client$handle_async_topic_messages.invokeStatic(client.clj:298)
        at kafka_clj.client$handle_async_topic_messages.invoke(client.clj:294)
        at kafka_clj.client$async_handler$fn__21121$fn__21130.invoke(client.clj:312)
        at clojure.lang.AFn.run(AFn.java:22)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

@XingZheFeng thanks for reporting this. The broken pipe might be the kafka broker closing the connection due to inactivity, or any other reason tbh, could even be a proxy or router closing it. If you can plz have a look at the broker logs and see if any errors that might give more insight?

In any case the producer should handle any kind of io failure and retry. What I'm going to do is make the producer keep a pool of connections and before each send check the connection live, if not reconnect and send (this should work).

how urgent is this fix for you?

very urgent . Study on several days, still can not solve this error.

ok, my work has kept me busy, but I'll make some time for it.

tks.
Now I'm going to live this way

(defn ceshi [t opts]
  (send-kafka "data" "zhixingle22")
  (prn "cronj Timed heartbeat" c @c)
  )
(defonce mmmm
         (cronj/cronj
           :entries
           [{:id       "session-cleanup"
             :handler  ceshi
             ;:schedule "* /30 * * * * *"
             :schedule "/60 * * * * * *"
             :opts     {}}]))
(cronj/start! mmmm)
;(reset! c {})

It's sad. The solution above is not good .
now ,I use
[org.apache.kafka/kafka_2.11 "0.10.1.1"]
[org.apache.kafka/kafka-clients "0.10.1.1"]

Hi, I'm sorry to hear it didn't work for you.

I've tried a quick fix of adding pools to producers, but am afraid that the code needs a redo, because the producers api was not made for it, but at the moment I'm not getting too much time for this, will try fixing it as time permits, but understand this might be too late for your purposes.

Thank you for your hard work.
I will be watching and testing.

today exception is

java.net.SocketException: Connection reset by peer: socket write error
	at java.net.SocketOutputStream.socketWrite0(Native Method)
	at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:109)
	at java.net.SocketOutputStream.write(SocketOutputStream.java:153)
	at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
	at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
	at kafka_clj.tcp_api$write_BANG_.invokeStatic(tcp_api.clj:14)
	at kafka_clj.tcp_api$write_BANG_.doInvoke(tcp_api.clj:10)
	at clojure.lang.RestFn.invoke(RestFn.java:464)
	at kafka_clj.produce$send_messages.invokeStatic(produce.clj:192)
	at kafka_clj.produce$send_messages.invoke(produce.clj:175)
	at kafka_clj.produce$send_messages.invokeStatic(produce.clj:181)
	at kafka_clj.produce$send_messages.invoke(produce.clj:175)
	at kafka_clj.client$send_messages.invokeStatic(client.clj:287)
	at kafka_clj.client$send_messages.invoke(client.clj:282)
	at kafka_clj.client$send_data.invokeStatic(client.clj:298)
	at kafka_clj.client$send_data.invoke(client.clj:291)
	at kafka_clj.client$handle_async_topic_messages.invokeStatic(client.clj:322)
	at kafka_clj.client$handle_async_topic_messages.invoke(client.clj:318)
	at kafka_clj.client$async_handler$fn__10902$fn__10911.invoke(client.clj:336)
	at clojure.lang.AFn.run(AFn.java:22)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Currently the error is seen because the connection is closed/removed/reset by either the broker or some component on the network. What the producer should do is detect and reconnect, this is what the consumers are doing already, but this logic was never propagated to the producers :(.

hi, I've made an initial release as 4.0.1-SNAPSHOT specifically uploaded: kafka-clj-4.0.1-20170216.112054-3.jar .

Normal tests pass, and it should work for the broken pipe issue, but I still need to test all failure scenarios etc, so its beta still, but if you want to try it out and see if it works for your use case, it will help allot?

actually ignore this, I've found some issues in this release.

ok, finally Ive made the latest 4.0.2 snapshot version that should fix the broken pipe issue,

this is a proper rewrite of the tcp code.

closing this ticket for the moment, if the error is seen again plz reopen it.