mpenet/jet

"Broken pipe" exception with long-running (SSE) request handlers

Closed this issue · 8 comments

Hi,

I have a request handler that looks like this

(defn yield-msg
  [chan]
  (alts!! [chan (timeout 5000)]))

(defn api-routes
  (let [publisher (chan)
        publication (sub publisher #(:topic %))]
    (routes
      (GET "/events" request
        (let [channel (chan 10)
              resp (chan 10)]
          (sub publication :events channel)

          (future
            (try
              (doseq [[msg source] (repeatedly #(yield-msg channel))
                      :while (or (not= channel source) msg)
                      :let [msg (or msg {:type "keepalive"})
                            msg (dissoc msg "topic" :topic)
                            data (format "data: %s\n\n" (generate-string msg))]]
                  (info msg)
                  (put! resp data))
                (catch Exception e
                  (error e "cannot handle incoming message"))
                (finally
                  (close! channel)
                  (close! resp))))

            {:status 200
             :headers {"Content-Type" "text/event-stream"
                       "X-Accel-Buffering" "no"
                       "Cache-Control" "no-cache"}
             :body resp})))))

The idea is to have a long-running event stream that the client subscribes to when running the client-side app.

On a browser refresh, the request handler tries to send data to a response that was closed by the client and this raises the following exception:

Exception in thread "async-dispatch-25" java.lang.Error: org.eclipse.jetty.io.EofException
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.eclipse.jetty.io.EofException
    at org.eclipse.jetty.io.ChannelEndPoint.flush(ChannelEndPoint.java:192)
    at org.eclipse.jetty.io.WriteFlusher.write(WriteFlusher.java:337)
    at org.eclipse.jetty.io.AbstractEndPoint.write(AbstractEndPoint.java:128)
    at org.eclipse.jetty.server.HttpConnection$SendCallback.process(HttpConnection.java:646)
    at org.eclipse.jetty.util.IteratingCallback.processIterations(IteratingCallback.java:233)
    at org.eclipse.jetty.util.IteratingCallback.iterate(IteratingCallback.java:193)
    at org.eclipse.jetty.server.HttpConnection.send(HttpConnection.java:457)
    at org.eclipse.jetty.server.HttpChannel.sendResponse(HttpChannel.java:769)
    at org.eclipse.jetty.server.HttpChannel.write(HttpChannel.java:802)
    at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:139)
    at org.eclipse.jetty.server.HttpOutput.write(HttpOutput.java:132)
    at org.eclipse.jetty.server.HttpOutput.flush(HttpOutput.java:234)
    at sun.nio.cs.StreamEncoder.implFlush(StreamEncoder.java:297)
    at sun.nio.cs.StreamEncoder.flush(StreamEncoder.java:141)
    at java.io.OutputStreamWriter.flush(OutputStreamWriter.java:229)
    at qbits.jet.servlet$fn__8430.invoke(servlet.clj:114)
    at qbits.jet.servlet$fn__8355$G__8350__8362.invoke(servlet.clj:97)
    at qbits.jet.servlet$fn__8374$fn__8390$state_machine__3935__auto____8391$fn__8393.invoke(servlet.clj:143)
    at qbits.jet.servlet$fn__8374$fn__8390$state_machine__3935__auto____8391.invoke(servlet.clj:140)
    at clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:940)
    at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:944)
    at clojure.core.async.impl.ioc_macros$take_BANG_$fn__3951.invoke(ioc_macros.clj:953)
    at clojure.core.async.impl.channels.ManyToManyChannel$fn__456.invoke(channels.clj:102)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    ... 2 more
Caused by: java.io.IOException: Broken pipe
    at sun.nio.ch.FileDispatcherImpl.writev0(Native Method)
    at sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)
    at sun.nio.ch.IOUtil.write(IOUtil.java:148)
    at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:524)
    at org.eclipse.jetty.io.ChannelEndPoint.flush(ChannelEndPoint.java:172)
    ... 26 more

The exception doesn't propagate to my (try block, meaning I can never close the response channel properly.

Is there a more idiomatic way to handle this use case with jet and properly close responses when clients disconnect?

I don't think there's a way to handle this case right now from clojure, but I could add a :ctrl channel just like we have for websocket so that you can catch these cases. I ll try to look into this later today.

or I could simply just close the body channel when this occurs, and you can then check the results of your puts!, but this is a bit more anonymous...

Could you try 0.5.0-SNAPSHOT ? the body channel should now close on disconnects (5 min patch, no guarantees, got to run now).

Thanks! No exceptions anymore on disconnects, and the result of put! lets me close channels appropriately. That seems good enough for long-running requests.

Good to hear! I ll cut a proper release then. you can grab 0.5.0-beta3 in 5min

✨ awesome, thanks!

beta2 actually

👍 great! Thanks!