Ktor: Flow/Subscription websocket stops working (Ping Timeout)
milgner opened this issue · 0 comments
Library Version
7.0.2
Describe the bug
It's possible for the websocket to become defunct at some point, preventing new requests from being processed at all; once this triggers, even the first line of my GraphQL subscription code isn't hit anymore in the debugger.
It seems to be related to active cancellation of the subscription flow by the client but unfortunately I have not been able to discern a pattern here. Even quickly starting and stopping the subscription does not reliably trigger the issue.
Stacktrace:
2024-01-31 13:36:19,824 [eventLoopGroupProxy-3-3 @raw-ws-handler#180] ERROR: Websocket handler failed
java.io.IOException: Ping timeout
at io.ktor.websocket.DefaultWebSocketSessionImpl$runOrCancelPinger$newPinger$1.invokeSuspend(DefaultWebSocketSession.kt:301)
at io.ktor.websocket.DefaultWebSocketSessionImpl$runOrCancelPinger$newPinger$1.invoke(DefaultWebSocketSession.kt)
at io.ktor.websocket.DefaultWebSocketSessionImpl$runOrCancelPinger$newPinger$1.invoke(DefaultWebSocketSession.kt)
at io.ktor.websocket.PingPongKt$pinger$1.invokeSuspend(PingPong.kt:95)
at _COROUTINE._BOUNDARY._(CoroutineDebugging.kt:46)
at com.expediagroup.graphql.server.ktor.subscriptions.DefaultKtorGraphQLSubscriptionRequestParser$parseRequestFlow$2.invokeSuspend(KtorGraphQLSubscriptionRequestParser.kt:46)
at kotlinx.coroutines.flow.AbstractFlow.collect(Flow.kt:230)
at kotlinx.coroutines.flow.internal.ChannelFlow$collectToFun$1.invokeSuspend(ChannelFlow.kt:60)
at _COROUTINE._CREATION._(CoroutineDebugging.kt:34)
at kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:161)
at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
at kotlinx.coroutines.flow.FlowKt__CollectKt.collect(Collect.kt:30)
at kotlinx.coroutines.flow.FlowKt.collect(Unknown Source)
at com.expediagroup.graphql.server.ktor.GraphQLRoutesKt$graphQLSubscriptionsRoute$1.invokeSuspend(GraphQLRoutes.kt:89)
at com.expediagroup.graphql.server.ktor.GraphQLRoutesKt$graphQLSubscriptionsRoute$1.invoke(GraphQLRoutes.kt)
at com.expediagroup.graphql.server.ktor.GraphQLRoutesKt$graphQLSubscriptionsRoute$1.invoke(GraphQLRoutes.kt)
at io.ktor.server.websocket.RoutingKt.handleServerSession(Routing.kt:253)
at io.ktor.server.websocket.RoutingKt.proceedWebSocket(Routing.kt:238)
at io.ktor.server.websocket.RoutingKt.access$proceedWebSocket(Routing.kt:1)
at io.ktor.server.websocket.RoutingKt$webSocket$2.invokeSuspend(Routing.kt:202)
at io.ktor.server.websocket.RoutingKt$webSocket$2.invoke(Routing.kt)
at io.ktor.server.websocket.RoutingKt$webSocket$2.invoke(Routing.kt)
at io.ktor.server.websocket.RoutingKt$webSocketRaw$2$1$1$1$1.invokeSuspend(Routing.kt:106)
at io.ktor.server.websocket.RoutingKt$webSocketRaw$2$1$1$1$1.invoke(Routing.kt)
at io.ktor.server.websocket.RoutingKt$webSocketRaw$2$1$1$1$1.invoke(Routing.kt)
at io.ktor.server.websocket.WebSocketUpgrade$upgrade$2.invokeSuspend(WebSocketUpgrade.kt:98)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.ktor.server.netty.EventLoopGroupProxy$Companion.create$lambda$1$lambda$0(NettyApplicationEngine.kt:296)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
at com.expediagroup.graphql.server.ktor.GraphQLRoutesKt$graphQLSubscriptionsRoute$1.invokeSuspend(GraphQLRoutes.kt:89)
at io.ktor.server.websocket.RoutingKt.handleServerSession(Routing.kt:253)
at io.ktor.server.websocket.RoutingKt.proceedWebSocket(Routing.kt:238)
at io.ktor.server.websocket.RoutingKt$webSocket$2.invokeSuspend(Routing.kt:202)
at io.ktor.server.websocket.RoutingKt$webSocketRaw$2$1$1$1$1.invokeSuspend(Routing.kt:106)
at io.ktor.server.websocket.WebSocketUpgrade$upgrade$2.invokeSuspend(WebSocketUpgrade.kt:98)
at _COROUTINE._CREATION._(CoroutineDebugging.kt:34)
at kotlin.coroutines.intrinsics.IntrinsicsKt__IntrinsicsJvmKt.createCoroutineUnintercepted(IntrinsicsJvm.kt:161)
at kotlinx.coroutines.intrinsics.CancellableKt.startCoroutineCancellable(Cancellable.kt:30)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch$default(Builders.common.kt:47)
at kotlinx.coroutines.BuildersKt.launch$default(Unknown Source)
at io.ktor.server.websocket.WebSocketUpgrade.upgrade(WebSocketUpgrade.kt:96)
at io.ktor.server.netty.http1.NettyHttp1ApplicationResponse.respondUpgrade(NettyHttp1ApplicationResponse.kt:100)
at io.ktor.server.engine.BaseApplicationResponse.respondOutgoingContent$suspendImpl(BaseApplicationResponse.kt:115)
at io.ktor.server.engine.BaseApplicationResponse.respondOutgoingContent(BaseApplicationResponse.kt)
at io.ktor.server.netty.NettyApplicationResponse.respondOutgoingContent$suspendImpl(NettyApplicationResponse.kt:37)
at io.ktor.server.netty.NettyApplicationResponse.respondOutgoingContent(NettyApplicationResponse.kt)
at io.ktor.server.engine.BaseApplicationResponse$Companion$setupSendPipeline$1.invokeSuspend(BaseApplicationResponse.kt:317)
at io.ktor.server.engine.BaseApplicationResponse$Companion$setupSendPipeline$1.invoke(BaseApplicationResponse.kt)
at io.ktor.server.engine.BaseApplicationResponse$Companion$setupSendPipeline$1.invoke(BaseApplicationResponse.kt)
at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
at io.opentelemetry.instrumentation.ktor.v2_0.server.KtorServerTracing$Feature$install$4.invokeSuspend(KtorServerTracing.kt:167)
at io.opentelemetry.instrumentation.ktor.v2_0.server.KtorServerTracing$Feature$install$4.invoke(KtorServerTracing.kt)
at io.opentelemetry.instrumentation.ktor.v2_0.server.KtorServerTracing$Feature$install$4.invoke(KtorServerTracing.kt)
at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
at io.ktor.util.pipeline.SuspendFunctionGun.proceedWith(SuspendFunctionGun.kt:88)
at io.ktor.server.engine.DefaultTransformKt$installDefaultTransformations$1.invokeSuspend(DefaultTransform.kt:29)
at io.ktor.server.engine.DefaultTransformKt$installDefaultTransformations$1.invoke(DefaultTransform.kt)
at io.ktor.server.engine.DefaultTransformKt$installDefaultTransformations$1.invoke(DefaultTransform.kt)
at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
at io.ktor.util.pipeline.SuspendFunctionGun.execute$ktor_utils(SuspendFunctionGun.kt:98)
at io.ktor.util.pipeline.Pipeline.execute(Pipeline.kt:77)
at io.ktor.server.websocket.RoutingKt.respondWebSocketRaw(Routing.kt:293)
at io.ktor.server.websocket.RoutingKt.access$respondWebSocketRaw(Routing.kt:1)
at io.ktor.server.websocket.RoutingKt$webSocketRaw$2$1$1$1.invokeSuspend(Routing.kt:105)
at io.ktor.server.websocket.RoutingKt$webSocketRaw$2$1$1$1.invoke(Routing.kt)
at io.ktor.server.websocket.RoutingKt$webSocketRaw$2$1$1$1.invoke(Routing.kt)
at io.ktor.server.routing.Route$buildPipeline$1$1.invokeSuspend(Route.kt:116)
at io.ktor.server.routing.Route$buildPipeline$1$1.invoke(Route.kt)
at io.ktor.server.routing.Route$buildPipeline$1$1.invoke(Route.kt)
at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
at io.ktor.util.pipeline.SuspendFunctionGun.execute$ktor_utils(SuspendFunctionGun.kt:98)
at io.ktor.util.pipeline.Pipeline.execute(Pipeline.kt:77)
at io.ktor.server.routing.Routing$executeResult$$inlined$execute$1.invokeSuspend(Pipeline.kt:478)
at io.ktor.server.routing.Routing$executeResult$$inlined$execute$1.invoke(Pipeline.kt)
at io.ktor.server.routing.Routing$executeResult$$inlined$execute$1.invoke(Pipeline.kt)
at io.ktor.util.debug.ContextUtilsKt$initContextInDebugMode$2.invokeSuspend(ContextUtils.kt:20)
at io.ktor.util.debug.ContextUtilsKt$initContextInDebugMode$2.invoke(ContextUtils.kt)
at io.ktor.util.debug.ContextUtilsKt$initContextInDebugMode$2.invoke(ContextUtils.kt)
at kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturn(Undispatched.kt:78)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.withContext(Builders.common.kt:167)
at kotlinx.coroutines.BuildersKt.withContext(Unknown Source)
at io.ktor.util.debug.ContextUtilsKt.initContextInDebugMode(ContextUtils.kt:20)
at io.ktor.server.routing.Routing.executeResult(Routing.kt:190)
at io.ktor.server.routing.Routing.interceptor(Routing.kt:64)
at io.ktor.server.routing.Routing$Plugin$install$1.invokeSuspend(Routing.kt:140)
at io.ktor.server.routing.Routing$Plugin$install$1.invoke(Routing.kt)
at io.ktor.server.routing.Routing$Plugin$install$1.invoke(Routing.kt)
at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
at io.ktor.server.engine.BaseApplicationEngineKt$installDefaultTransformationChecker$1.invokeSuspend(BaseApplicationEngine.kt:124)
at io.ktor.server.engine.BaseApplicationEngineKt$installDefaultTransformationChecker$1.invoke(BaseApplicationEngine.kt)
at io.ktor.server.engine.BaseApplicationEngineKt$installDefaultTransformationChecker$1.invoke(BaseApplicationEngine.kt)
at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
at io.opentelemetry.instrumentation.ktor.v2_0.server.KtorServerTracing$Feature$install$3$1.invokeSuspend(KtorServerTracing.kt:148)
at io.opentelemetry.instrumentation.ktor.v2_0.server.KtorServerTracing$Feature$install$3$1.invoke(KtorServerTracing.kt)
at io.opentelemetry.instrumentation.ktor.v2_0.server.KtorServerTracing$Feature$install$3$1.invoke(KtorServerTracing.kt)
at kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturn(Undispatched.kt:78)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.withContext(Builders.common.kt:167)
at kotlinx.coroutines.BuildersKt.withContext(Unknown Source)
at io.opentelemetry.instrumentation.ktor.v2_0.server.KtorServerTracing$Feature$install$3.invokeSuspend(KtorServerTracing.kt:146)
at io.opentelemetry.instrumentation.ktor.v2_0.server.KtorServerTracing$Feature$install$3.invoke(KtorServerTracing.kt)
at io.opentelemetry.instrumentation.ktor.v2_0.server.KtorServerTracing$Feature$install$3.invoke(KtorServerTracing.kt)
at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
at io.ktor.server.application.hooks.CallFailed$install$1$1.invokeSuspend(CommonHooks.kt:45)
at io.ktor.server.application.hooks.CallFailed$install$1$1.invoke(CommonHooks.kt)
at io.ktor.server.application.hooks.CallFailed$install$1$1.invoke(CommonHooks.kt)
at kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturn(Undispatched.kt:78)
at kotlinx.coroutines.CoroutineScopeKt.coroutineScope(CoroutineScope.kt:264)
at io.ktor.server.application.hooks.CallFailed$install$1.invokeSuspend(CommonHooks.kt:44)
at io.ktor.server.application.hooks.CallFailed$install$1.invoke(CommonHooks.kt)
at io.ktor.server.application.hooks.CallFailed$install$1.invoke(CommonHooks.kt)
at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
at io.ktor.util.pipeline.SuspendFunctionGun.execute$ktor_utils(SuspendFunctionGun.kt:98)
at io.ktor.util.pipeline.Pipeline.execute(Pipeline.kt:77)
at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1$invokeSuspend$$inlined$execute$1.invokeSuspend(Pipeline.kt:478)
at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
at io.ktor.util.debug.ContextUtilsKt$initContextInDebugMode$2.invokeSuspend(ContextUtils.kt:20)
at io.ktor.util.debug.ContextUtilsKt$initContextInDebugMode$2.invoke(ContextUtils.kt)
at io.ktor.util.debug.ContextUtilsKt$initContextInDebugMode$2.invoke(ContextUtils.kt)
at kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturn(Undispatched.kt:78)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.withContext(Builders.common.kt:167)
at kotlinx.coroutines.BuildersKt.withContext(Unknown Source)
at io.ktor.util.debug.ContextUtilsKt.initContextInDebugMode(ContextUtils.kt:20)
at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1.invokeSuspend(DefaultEnginePipeline.kt:123)
at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1.invoke(DefaultEnginePipeline.kt)
at io.ktor.server.engine.DefaultEnginePipelineKt$defaultEnginePipeline$1.invoke(DefaultEnginePipeline.kt)
at io.ktor.util.pipeline.SuspendFunctionGun.loop(SuspendFunctionGun.kt:120)
at io.ktor.util.pipeline.SuspendFunctionGun.proceed(SuspendFunctionGun.kt:78)
at io.ktor.util.pipeline.SuspendFunctionGun.execute$ktor_utils(SuspendFunctionGun.kt:98)
at io.ktor.util.pipeline.Pipeline.execute(Pipeline.kt:77)
at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1$invokeSuspend$$inlined$execute$1.invokeSuspend(Pipeline.kt:478)
at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1$invokeSuspend$$inlined$execute$1.invoke(Pipeline.kt)
at io.ktor.util.debug.ContextUtilsKt$initContextInDebugMode$2.invokeSuspend(ContextUtils.kt:20)
at io.ktor.util.debug.ContextUtilsKt$initContextInDebugMode$2.invoke(ContextUtils.kt)
at io.ktor.util.debug.ContextUtilsKt$initContextInDebugMode$2.invoke(ContextUtils.kt)
at kotlinx.coroutines.intrinsics.UndispatchedKt.startUndispatchedOrReturn(Undispatched.kt:78)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.withContext(Builders.common.kt:167)
at kotlinx.coroutines.BuildersKt.withContext(Unknown Source)
at io.ktor.util.debug.ContextUtilsKt.initContextInDebugMode(ContextUtils.kt:20)
at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1.invokeSuspend(NettyApplicationCallHandler.kt:119)
at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1.invoke(NettyApplicationCallHandler.kt)
at io.ktor.server.netty.NettyApplicationCallHandler$handleRequest$1.invoke(NettyApplicationCallHandler.kt)
at kotlinx.coroutines.intrinsics.UndispatchedKt.startCoroutineUndispatched(Undispatched.kt:44)
at kotlinx.coroutines.BuildersKt__Builders_commonKt.launch(Builders.common.kt:56)
at kotlinx.coroutines.BuildersKt.launch(Unknown Source)
at io.ktor.server.netty.NettyApplicationCallHandler.handleRequest(NettyApplicationCallHandler.kt:37)
at io.ktor.server.netty.NettyApplicationCallHandler.channelRead(NettyApplicationCallHandler.kt:29)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
at io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:425)
Caused by: java.io.IOException: Ping timeout
at io.ktor.websocket.DefaultWebSocketSessionImpl$runOrCancelPinger$newPinger$1.invokeSuspend(DefaultWebSocketSession.kt:301)
at io.ktor.websocket.DefaultWebSocketSessionImpl$runOrCancelPinger$newPinger$1.invoke(DefaultWebSocketSession.kt)
at io.ktor.websocket.DefaultWebSocketSessionImpl$runOrCancelPinger$newPinger$1.invoke(DefaultWebSocketSession.kt)
at io.ktor.websocket.PingPongKt$pinger$1.invokeSuspend(PingPong.kt:95)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.internal.ScopeCoroutine.afterResume(Scopes.kt:32)
at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:102)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.ktor.server.netty.EventLoopGroupProxy$Companion.create$lambda$1$lambda$0(NettyApplicationEngine.kt:296)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
To Reproduce
Since the code of the subscription doesn't get called anymore after triggering the bug, I assume that it hasn't anything to do with it in particular?
In any case, this is my short and concise implementation inside my Subscription
class:
data class Error(val message: String)
@GraphQLDescription("Run resource discovery and return all discovered resources")
@GraphQLUnion(
name="DiscoveryResponse",
possibleTypes = [ResourceInfo::class, Error::class],
description = "Either an error encountered during discovery or information about a found resource"
)
suspend fun discoverResources(dataSourceId: UUID): Flow<Any> =
operation(DataSourceId(dataSourceId)).fold(
ifLeft = { flowOf(Error(it.message!!)) },
ifRight = { upstreamFlow ->
upstreamFlow.map(ResourceInfo::fromPersisted)
}
)
Im using GraphiQL to start the subscription and also to immediately stop it afterwards, doing this in quick succession.
At some point, the client completely stops producing any output. Even activating a breakpoint at the first line of the discoverResources
method isn't hit anymore. The issue persists after reloading the client, so it's probably something on the server side.
Unfortunately, this seems to be some kind of race condition and is really hard to reproduce consistently. Maybe once every 50 tries of quickly starting and stopping the subscription.
Expected behavior
The subscription code will be executed regardless of previous requests.