http4s/http4s-jdk-http-client

Java IOException

Closed this issue · 14 comments

I'm getting a Java IOException when the websocket is closed from the server and I presume that the client is sending a close message. I'm haven't read the RFC to ascertain the correct behaviour (whether this is a misbehaving websocket server, or it can happen).

java.io.IOException: closed output
        at java.net.http/jdk.internal.net.http.RawChannelTube.write(RawChannelTube.java:340)
        at java.net.http/jdk.internal.net.http.websocket.TransportImpl.write(TransportImpl.java:114)
        at java.net.http/jdk.internal.net.http.websocket.TransportImpl$SendTask.tryCompleteWrite(TransportImpl.java:597)
        at java.net.http/jdk.internal.net.http.websocket.TransportImpl$SendTask.run(TransportImpl.java:547)
        at java.net.http/jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.run(SequentialScheduler.java:147)
        at java.net.http/jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.run(SequentialScheduler.java:198)
        at java.net.http/jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(SequentialScheduler.java:271)
        at java.net.http/jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(SequentialScheduler.java:224)
        at java.net.http/jdk.internal.net.http.websocket.TransportImpl.sendClose(TransportImpl.java:271)
        at java.net.http/jdk.internal.net.http.websocket.WebSocketImpl.sendClose0(WebSocketImpl.java:337)
        at java.net.http/jdk.internal.net.http.websocket.WebSocketImpl.sendClose(WebSocketImpl.java:307)
        at org.http4s.client.jdkhttpclient.JdkWSClient$.$anonfun$apply$17(JdkWSClient.scala:91)
        at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:87)
        at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:355)
        at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:376)
        at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:316)
        at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:136)
        at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:355)
        at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:376)
        at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:316)
        at cats.effect.internals.Callback$AsyncIdempotentCallback.run(Callback.scala:131)
        at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:70)
        at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:36)

It would be good to log message on the error under handleReceive - at the moment it's simply ignoring the error message and closing the WSFrame (which I suspect is what's happening in this particular case).

case Left(_) | Right(_: WSFrame.Close) => queue.enqueue1(none)

Reading through the spec, I can only think of one particular case in which I'd expect this particular error (unless the java implementation isn't correct according to the spec).

According to the spec, the onError of the listener should only be called in one of two cases:

  • one of the other handler methods threw an error (since we're just enqueing, I think that this is unlikely)
  • it's a fatal exception. In the cases that it's a fatal exception the websocket should be closed by the time onError is called.

So the only scenario I can see this happening is that I'm receiving a close frame from the server which then abruptly closes the connection causing the websocket to error and terminate.

The ideal situation is that the error is propagated back to the use, but this is a little tricky; since onError is guaranteed to only to call once and there's a race condition either way, I propose that we can have a Ref to at least hold the error and on the last send we do an attempt and check if there's an error in the ref cell.

Does this seem sane?

/cc @amesgen

If I am not mistaken no exceptions are silently dropped (see here, in particular line 47; line 48-52 only exist for closing the queue).

I am not entirely sure if I understand what you are describing. It would be extremely helpful if you could provide a minimal example which reproduces the issue. Otherwise, please describe in more detail how you are interacting with the server. Maybe a test case is similar to what you are doing.

One more thought: Can you rule out that the server is misbehaving? @rossabaker found a bug (involving weird stuff with close frames, see here and here) in the blaze WS server during creation of the test suite, but it was not easy to find.

My mistake - you're right the exceptions aren't being dropped.

I think what's happening is this:

  1. server sends a close frame
  2. resource attempts to close
  3. isOutputClosed returns false
  4. sendClose throws the above IOException

I think basically that between 1, 2, the server closes the connection without waiting for the client to send anything and since the spec says that when the listener detects a fatal exception it will close the websocket so something is closing the websocket between isOutputClosed and sendClose.

(Note, that that means that the real exception is stuck in the queue)

I am just doing connectHighLevel and a receiveStream. The error takes ~8-9 hours of a constant connection to manifest so I think something is going on with the network. I've stopped the default java TTL because I think it might be a DNS issue.

Ok, I am not sure how to prevent this exact scenario. The isOutputClosed check can only be, as you describe, a "best effort" to prevent sendClose from throwing in the resource cleanup handler, as it is certainly possible that the output is closed inbetween by e.g. a fatal exception.
One "solution" is to drop exceptions of the sendClose call.

If you just use connectHighLevel + receiveStream and no complicated protocol logic: Can you do the same with another WS client (e.g. websocat or sttp) to see if the issue is in fact network-caused?

My personal experience with long WS sessions (which essentially consist of receiveStream + occasional send(WSFrame.Text("....")) responses): They just die sometimes (once every 1-2 days) for no apparent reason (tested with mutliple WS clients in at least 3 languages) - fiddling with TCP settings (espc. keepalive) helps a bit.

By the way: You can still inspect the queue by starting a new fiber reading the WS stream and intentionally leaking it (ofc only to be used for debugging).

Yes I will try that. The exception I suspect happens on a network level/server because I'm having issues reconnecting (hence why I think it might be a TTL issue with the DNS which is what I'm debugging at the moment) without a restart of the process (simply doing a stream.handleErrorWith(e => stream)).

Could you share some of the options which you're using please?

I'll take a look to see if there's anything else in the queue if this doesn't work.

EDIT.

Regarding whether to drop the errors, I'm not sure what behaviour would be better tbh. I think it's better to flag these errors up - it's just a little frustrating that the actual fatal exception doesn't get thrown.

In my particular situation setting the keepalive to 60 (seconds) increased stability (but I am currently using tungstenite-rs and not http4s-jdk-http-client for this).

Why do you not use stream.handleErrorWith(e => stream)? It would be my first approach when implementing "reconnect" behavoir.

Concerning the error situation: If sendClose errors, we could use tryDequeue1 to look for an exception and throw a CompositeFailure. What about that?

I am currently reconnecting like that - it just throws the same error each time which is why I think it's a network level error.

Ah, I misread that. Yes, I agree that this smells like a network level error.

Regarding the error; I think that looking in the queue if sendClose errors would be a good idea - would it make more sense to tryDequeueChunk and then see if any errors appear in the queue?

What about this?

diff --git a/core/src/main/scala/org/http4s/client/jdkhttpclient/JdkWSClient.scala b/core/src/main/scala/org/http4s/client/jdkhttpclient/JdkWSClient.scala
index cfdc57f..db1bdf8 100644
--- a/core/src/main/scala/org/http4s/client/jdkhttpclient/JdkWSClient.scala
+++ b/core/src/main/scala/org/http4s/client/jdkhttpclient/JdkWSClient.scala
@@ -1,14 +1,18 @@
 package org.http4s.client.jdkhttpclient
 
+import java.io.IOException
 import java.net.URI
 import java.net.http.{HttpClient, WebSocket => JWebSocket}
 import java.nio.ByteBuffer
 import java.util.concurrent.{CompletableFuture, CompletionStage}
 
 import cats._
+import cats.data.NonEmptyList
 import cats.effect._
 import cats.effect.concurrent.Semaphore
+import cats.effect.util.CompositeException
 import cats.implicits._
+import fs2.Chunk
 import fs2.concurrent.Queue
 import org.http4s.headers.`Sec-WebSocket-Protocol`
 import org.http4s.internal.fromCompletableFuture
@@ -83,13 +87,23 @@ object JdkWSClient {
               sendSem <- Semaphore[F](1L)
             } yield (webSocket, queue, sendSem)
           } {
-            case (webSocket, _, _) =>
+            case (webSocket, queue, _) =>
               for {
                 isOutputOpen <- F.delay(!webSocket.isOutputClosed)
                 closeOutput = fromCompletableFuture(
                   F.delay(webSocket.sendClose(JWebSocket.NORMAL_CLOSURE, ""))
                 )
-                _ <- closeOutput.whenA(isOutputOpen)
+                _ <- closeOutput.whenA(isOutputOpen).onError {
+                  case e: IOException =>
+                    for {
+                      chunk <- queue.tryDequeueChunk1(10)
+                      errs = Chunk(chunk.flatten.toSeq: _*).flatten.collect { case Left(e) => e }
+                      _ <- F.raiseError[Unit](NonEmptyList.fromFoldable(errs) match {
+                        case Some(nel) => new CompositeException(e, nel)
+                        case None => e
+                      })
+                    } yield ()
+                }
               } yield ()
           }
           .map {

The 10 is subject to debate! (2 is enough prbbly)

Yes that looks good - I suspect that it won't help in my particular case (since we've already enqueued a none), but it should help in some other cases.

Shall we say 8 since magic numbers which are 2^n inexplicably look nicer?

Any new findings on this one?

Closing, because I think there's an issue on their server.