http4s/http4s-jdk-http-client

JdkWSClient will occasionally try to send a CLOSED frame on an already closed connection

Closed this issue · 8 comments

This is odd to me because it looks like there is logic to make sure that this doesn't happen.

http4s-jdk-http-client version 0.3.0
openjdk version "14" 2020-03-17
OpenJDK Runtime Environment (build 14+36-1461)
OpenJDK 64-Bit Server VM (build 14+36-1461, mixed mode, sharing)

Here's the stack trace I'm getting:

java.io.IOException: closed output
        at java.net.http/jdk.internal.net.http.RawChannelTube.write(RawChannelTube.java:340)
        at java.net.http/jdk.internal.net.http.websocket.TransportImpl.write(TransportImpl.java:114)
        at java.net.http/jdk.internal.net.http.websocket.TransportImpl$SendTask.tryCompleteWrite(TransportImpl.java:597)
        at java.net.http/jdk.internal.net.http.websocket.TransportImpl$SendTask.run(TransportImpl.java:547)
        at java.net.http/jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.run(SequentialScheduler.java:147)
        at java.net.http/jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.run(SequentialScheduler.java:198)
        at java.net.http/jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(SequentialScheduler.java:271)
        at java.net.http/jdk.internal.net.http.common.SequentialScheduler.runOrSchedule(SequentialScheduler.java:224)
        at java.net.http/jdk.internal.net.http.websocket.TransportImpl.sendClose(TransportImpl.java:271)
        at java.net.http/jdk.internal.net.http.websocket.WebSocketImpl.sendClose0(WebSocketImpl.java:337)
        at java.net.http/jdk.internal.net.http.websocket.WebSocketImpl.sendClose(WebSocketImpl.java:307)
        at org.http4s.client.jdkhttpclient.JdkWSClient$.$anonfun$apply$17(JdkWSClient.scala:94)
        at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:87)
        at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:359)
        at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:380)
        at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:323)
        at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:139)
        at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:359)
        at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:380)
        at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:323)
        at cats.effect.internals.Callback$AsyncIdempotentCallback.run(Callback.scala:130)
        at cats.effect.internals.Trampoline.cats$effect$internals$Trampoline$$immediateLoop(Trampoline.scala:67)
        at cats.effect.internals.Trampoline.startLoop(Trampoline.scala:35)
        at cats.effect.internals.TrampolineEC$JVMTrampoline.super$startLoop(TrampolineEC.scala:89)
        at cats.effect.internals.TrampolineEC$JVMTrampoline.$anonfun$startLoop$1(TrampolineEC.scala:89)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:94)
        at cats.effect.internals.TrampolineEC$JVMTrampoline.startLoop(TrampolineEC.scala:89)
        at cats.effect.internals.Trampoline.execute(Trampoline.scala:43)
        at cats.effect.internals.TrampolineEC.execute(TrampolineEC.scala:42)
        at cats.effect.internals.Callback$AsyncIdempotentCallback.apply(Callback.scala:136)
        at cats.effect.internals.Callback$AsyncIdempotentCallback.apply(Callback.scala:125)
        at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$unsafeRegister$1(Deferred.scala:201)
        at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$unsafeRegister$1$adapted(Deferred.scala:201)
        at cats.effect.concurrent.Deferred$ConcurrentDeferred.$anonfun$notifyReadersLoop$1(Deferred.scala:236)
        at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
        at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:87)
        at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:359)
        at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:380)
        at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:323)
        at cats.effect.internals.IOShift$Tick.run(IOShift.scala:35)
        at cats.effect.internals.PoolUtils$$anon$2$$anon$3.run(PoolUtils.scala:52)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
        at java.base/java.lang.Thread.run(Thread.java:832)

Yes, this is expected, and came up e.g. here. The isOutputClosed check can only be a "best effort" to prevent sendClose from throwing in the resource cleanup handler, as it is certainly possible that the output is closed inbetween by e.g. a fatal exception. One "solution" is to drop exceptions of the sendClose call.

What do you think about ignoring IOExceptions with message = "closed output"? This should fix your specific problem.

diff --git a/core/src/main/scala/org/http4s/client/jdkhttpclient/JdkWSClient.scala b/core/src/main/scala/org/http4s/client/jdkhttpclient/JdkWSClient.scala
index 2b6efd1..6a5e7a0 100644
--- a/core/src/main/scala/org/http4s/client/jdkhttpclient/JdkWSClient.scala
+++ b/core/src/main/scala/org/http4s/client/jdkhttpclient/JdkWSClient.scala
@@ -93,17 +93,20 @@ object JdkWSClient {
                 closeOutput = fromCompletionStage(
                   F.delay(webSocket.sendClose(JWebSocket.NORMAL_CLOSURE, ""))
                 )
-                _ <- closeOutput.whenA(isOutputOpen).onError {
-                  case e: IOException =>
-                    for {
-                      chunk <- queue.tryDequeueChunk1(10)
-                      errs = Chunk(chunk.flatten.toSeq: _*).flatten.collect { case Left(e) => e }
-                      _ <- F.raiseError[Unit](NonEmptyList.fromFoldable(errs) match {
-                        case Some(nel) => new CompositeException(e, nel)
-                        case None => e
-                      })
-                    } yield ()
-                }
+                _ <- closeOutput
+                  .whenA(isOutputOpen)
+                  .recover { case e: IOException if e.getMessage == "closed output" => () }
+                  .onError {
+                    case e: IOException =>
+                      for {
+                        chunk <- queue.tryDequeueChunk1(10)
+                        errs = Chunk(chunk.flatten.toSeq: _*).flatten.collect { case Left(e) => e }
+                        _ <- F.raiseError[Unit](NonEmptyList.fromFoldable(errs) match {
+                          case Some(nel) => new CompositeException(e, nel)
+                          case None => e
+                        })
+                      } yield ()
+                  }
               } yield ()
           }
           .map {

One more thing: Do you manually send a close frame / initiate a server side close? This does greatly increase the chance of a race condition.

That solution doesn't sound super great but I guess it's the best you can do. No, I don't manually send a close frame. If there's an error talking to the service I let it propagate up until the connection resource is closed and then I attempt to reacquire the resource to reconnect:

Stream
  .resource(wsClient.connectHighLevel(WSRequest(uri, Headers.of(headers(token)))))
  .flatMap(connection => events(connection, sequenceNumber, acks, sessionId))
  .handleErrorWith(e => Stream.eval_(IO(println(e))))
  .repeat

Yes, it would be optimal to have e.g. sendWhenOpen which does not allow a TOCTOU problem. But indeed, we can't do better than the diff above.

Concerning your case: Your reconnect code looks fine, how can I reproduce your situation? I am interested why the race condition applies in your case.

I'm not sure how to reproduce my situation easily. Similar to the old issue you linked above, this only happens to me after the application has been running for several hours. Both times I've encountered it were after leaving it running overnight. I can probably try and set something up to monitor the connection overnight and see what eventually causes the issue.

Ok, so I guess it is related to spurious network failures, which are difficult to debug. If you are sure that the issue is caused by http4s-jdk-http-client (for example if it does not happen with another websocket client), please create an issue for that!

Sorry I haven't been able to work on this in a while. I'm still getting this problem (I know you haven't released the fix yet) and I'm wondering if it's happening to me frequently because I'm interrupting the receiveStream when I get the close frame. I don't need to do this right? And maybe having it is causing some sort of race condition?

connection.receiveStream
  .collect {
    case Text(data, _) => data
  }
  .map(decode[Event])
  .rethrow
  .map(handleEvents)
  .parJoinUnbounded
  .interruptWhen(connection.closeFrame.get.map(handleConnectionClose))  

I don't need to do this right?

Yes, you don't have to do this, receiveStream (which is just Stream.repeatEval(receive).unNoneTerminate) will just end when the receiving side of the websocket connection is closed.

And maybe having it is causing some sort of race condition?

Yes, that might be the reason. Do you use connectHighLevel? In this case, close frames are automatically replied to with another close frame, and as you interrupt the stream when you get a close stream, the finalizer of the connection resource runs immediatly, also closing the connection.

Yes, I'm using connectHighLevel so I'll just remove that interrupt. Thanks 👍