rsocket/rsocket-java

Deadlock on RSocketRequester lock when merging output of rsocket channels into another channel

lkolisko opened this issue · 1 comments

I am running into a deadlock between close and cancel when multiple rsocket channels output goes through FluxMerge reactor operator with output to a single rsocket channel. Is there any guidance on how to avoid the issue or investigate further?
Thanks in advance.

Expected Behavior

  • No deadlock

Actual Behavior

  • Thread epoll-10 is in deadlock with thread epoll-12
epoll-10 

stackTrace:
java.lang.Thread.State: BLOCKED (on object monitor)
at io.rsocket.core.RequesterResponderSupport.remove(RequesterResponderSupport.java:152)
- waiting to lock <0x00000005d37ad000> (a io.rsocket.core.RSocketRequester)
at io.rsocket.core.RequestChannelRequesterFlux.tryCancel(RequestChannelRequesterFlux.java:513)
at io.rsocket.core.RequestChannelRequesterFlux.cancel(RequestChannelRequesterFlux.java:480)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.cancel(FluxPublishOn.java:277)
at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
....
at io.rsocket.core.RequestChannelRequesterFlux.handleError(RequestChannelRequesterFlux.java:698)
at io.rsocket.core.RSocketRequester.terminate(RSocketRequester.java:354)
- locked <0x00000005d37ad810> (a io.rsocket.core.RSocketRequester)
at io.rsocket.core.RSocketRequester.tryShutdown(RSocketRequester.java:328)
at io.rsocket.core.RSocketRequester$$Lambda$1654/0x00000008015c9948.run(Unknown Source)
at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:135)
at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:238)
at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70)
at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46)
at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51)
at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:49)
at io.rsocket.transport.netty.TcpDuplexConnection$$Lambda$1641/0x00000008015c0df8.operationComplete(Unknown Source)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1164)
at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:755)
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:731)
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620)
at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.shutdownInput(AbstractEpollChannel.java:522)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:733)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:826)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
Locked ownable synchronizers:
- None
epoll-12

stackTrace:
java.lang.Thread.State: BLOCKED (on object monitor)
at io.rsocket.core.RequesterResponderSupport.remove(RequesterResponderSupport.java:152)
- waiting to lock <0x00000005d37ad810> (a io.rsocket.core.RSocketRequester)
at io.rsocket.core.RequestChannelRequesterFlux.tryCancel(RequestChannelRequesterFlux.java:513)
at io.rsocket.core.RequestChannelRequesterFlux.cancel(RequestChannelRequesterFlux.java:480)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.cancel(FluxPublishOn.java:277)
at reactor.core.publisher.FluxMap$MapSubscriber.cancel(FluxMap.java:169)
at reactor.core.publisher.Operators.terminate(Operators.java:1240)
....
at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:84)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:312)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onError(FluxPublishOn.java:248)
at io.rsocket.core.RequestChannelRequesterFlux.handleError(RequestChannelRequesterFlux.java:698)
at io.rsocket.core.RSocketRequester.terminate(RSocketRequester.java:354)
- locked <0x00000005d37ad000> (a io.rsocket.core.RSocketRequester)
at io.rsocket.core.RSocketRequester.tryShutdown(RSocketRequester.java:328)
at io.rsocket.core.RSocketRequester$$Lambda$1654/0x00000008015c9948.run(Unknown Source)
at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:135)
at reactor.core.publisher.SinkEmptyMulticast$VoidInner.complete(SinkEmptyMulticast.java:238)
at reactor.core.publisher.SinkEmptyMulticast.tryEmitEmpty(SinkEmptyMulticast.java:70)
at reactor.core.publisher.SinkEmptySerialized.tryEmitEmpty(SinkEmptySerialized.java:46)
at io.rsocket.internal.BaseDuplexConnection.dispose(BaseDuplexConnection.java:51)
at io.rsocket.transport.netty.TcpDuplexConnection.lambda$new$0(TcpDuplexConnection.java:49)
at io.rsocket.transport.netty.TcpDuplexConnection$$Lambda$1641/0x00000008015c0df8.operationComplete(Unknown Source)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)
at io.netty.channel.DefaultChannelPromise.trySuccess(DefaultChannelPromise.java:84)
at io.netty.channel.AbstractChannel$CloseFuture.setClosed(AbstractChannel.java:1164)
at io.netty.channel.AbstractChannel$AbstractUnsafe.doClose0(AbstractChannel.java:755)
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:731)
at io.netty.channel.AbstractChannel$AbstractUnsafe.close(AbstractChannel.java:620)
at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.shutdownInput(AbstractEpollChannel.java:522)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.handleReadException(AbstractEpollStreamChannel.java:733)
at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:826)
at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:487)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:385)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(java.base@17.0.3/Thread.java:833)
Locked ownable synchronizers:
- None

Steps to Reproduce

  • The flow is that the output of multiple rsocket channels is merged and written as an output to another rsocket channel.
  • Unfortunately I do not have a simplified test to reproduce the issue at the moment.

Environment

  • OpenJDK 64-Bit Server VM Temurin-17.0.3+7 (build 17.0.3+7, mixed mode, sharing)
  • rsocket 1.1.2
  • netty 4.1.79
  • reactor-core 3.4.21

@lkolisko thanks for finding this! Really helpful. Fixed and should be released in 1.1.3 next month