mirromutth/r2dbc-mysql

After dispose a r2dbc flux , which execute a select sql for many rows, the underlying query still exist and fetch data from db

gongjinglong opened this issue · 0 comments

  1. The scenario
    I use r2dbc to read many rows, and need to stop reading when some condition happened. I want to use Disposable::dispose() to achieve this purpose, but i encountered a problem. Below are the example code:
	private static final void testDispose_DatabaseClient(ConfigurableApplicationContext context) throws InterruptedException {
		DatabaseClient databaseClient = context.getBean(DatabaseClient.class);

		AtomicLong readCount = new AtomicLong();

		String sql = "select * from `t_trade_ext_for_update` WHERE id <=200*10000";
		Disposable disposable = databaseClient
				.sql(sql)
				.map(r -> {
					int id = r.get("id", Long.class).intValue();
					TradeExt tradeExt = new TradeExt(id);
					return tradeExt;
				})
				.all()
				.log((String) null, Level.INFO, false, Stream.of(SignalType.values()).filter(t -> t != SignalType.ON_NEXT).toArray(SignalType[]::new))
				.subscribe(t->{
					readCount.incrementAndGet();
					if (t.getId() % 10000 == 0) {
						logger.info("item: {}", t.getId());
					}
				});

		// dispose the r2dbc stream in another thread if some condition happened
		Thread disposeThread = new Thread(new Runnable() {
			@SneakyThrows
			@Override
			public void run() {
				// a mock condition on which need to stop the flux
				while (readCount.get() <= 10*10000) {
					Thread.sleep(500);
				}
				logger.info("################### call dispose ###########");
				disposable.dispose();
				logger.info("################### disposed ###########");
			}
		}, "dispose-thread");
		disposeThread.start();

		// sleep some time to avoid jvm exit
		Thread.sleep(600 * 1000);
	}

  1. The problem
    After disposable.dispose() executed, I run show processlist on the db, and I found the sql is still running, how can i completely stop it ?
    I also debug the driver my code, i found that, the flux is running with the below stack:
	...
	at reactor.core.publisher.Operators.onDiscard(Operators.java:438)
	at dev.miku.r2dbc.mysql.util.DiscardOnCancelSubscriber.onNext(DiscardOnCancelSubscriber.java:72)
	at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:668)
	at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:746)
	at reactor.core.publisher.FluxWindowPredicate$WindowFlux.onNext(FluxWindowPredicate.java:788)
	at reactor.core.publisher.FluxWindowPredicate$WindowPredicateMain.onNext(FluxWindowPredicate.java:266)
	at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onNext(FluxHandleFuseable.java:191)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
	at dev.miku.r2dbc.mysql.util.DiscardOnCancelSubscriber.onNext(DiscardOnCancelSubscriber.java:70)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:250)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:126)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
	at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
	at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
	at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
	at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:265)
	at dev.miku.r2dbc.mysql.client.ReactorNettyClient$ResponseSink.next(ReactorNettyClient.java:340)
	at dev.miku.r2dbc.mysql.client.ReactorNettyClient.lambda$new$0(ReactorNettyClient.java:103)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:185)
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:279)
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:388)
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:404)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at dev.miku.r2dbc.mysql.client.MessageDuplexCodec.handleDecoded(MessageDuplexCodec.java:187)
	at dev.miku.r2dbc.mysql.client.MessageDuplexCodec.channelRead(MessageDuplexCodec.java:95)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1372)
	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1235)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1284)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:510)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:449)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:748)

I doubt the problem is related to DiscardOnCancelSubscriber, but i‘m not clear about that.

  1. Furthermore
  • I tried calling the connection's close() within doCancel and doFatally, but can not solve the problem.
  • I tried using r2dbc-mariadb 1.0.3 as alternative driver, a similar phenomenon occurred.

So is this a bug of our driver, or perhaps it's more likely that I'm using it wrong.
thanks!