r2dbc/r2dbc-mssql

Using publishOn/subscribeOn hangs up select queries

prishedko opened this issue · 2 comments

Bug Report

Versions

  • Driver: 0.9.0.RELEASE and 0.8.8.RELEASE
  • Database: SQL Server 2019 CU10
  • Java: OpenJDK 11.0.13
  • OS: Ubuntu 21.10, 5.13.0-27-generic

Current Behavior

I've modified test example from ticket 216 to build a bit more complex reactive flow like:

    private void runTest(ConnectionFactory connectionFactory) {
        var results = Mono.from(connectionFactory.create())
                .flatMapMany(c -> readMessages()
                        .flatMap(v -> query(c))
                        .flatMap(v -> query(c))
                        .collectList()
                        .flatMap(v -> Mono.from(c.close()).then(Mono.just(v)))
                        .subscribeOn(Schedulers.single()))
                .collectList()
                .timeout(Duration.ofMillis(TIMEOUT_MS))
                .blockOptional();
        assertTrue(results.isPresent() && !results.get().isEmpty());
    }

    private Flux<Integer> readMessages() {
        return Flux.range(0, MSG_COUNT).subscribeOn(messageHandlerScheduler);
    }

    private Flux<Object> query(Connection c) {
        return Flux.from(c.createStatement("select * from test.Test").execute())
                .flatMap(r -> r.map((row, m) -> row.get(0)))
                .subscribeOn(Schedulers.elastic())
                .publishOn(messageHandlerScheduler);
    }

Execution of such flow hangs up with pretty high probability in most cases. In the attached tests (see below) it looks like:

Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 40.128 sec <<< FAILURE!
shouldNotHangUpWithSmallPool(io.r2dbc.mssql.testcase.MsSqlTest)  Time elapsed: 30.045 sec  <<< ERROR!
reactor.core.Exceptions$ReactiveException: java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 30000ms in 'collectList' (and no fallback has been configured)
	at reactor.core.Exceptions.propagate(Exceptions.java:392)
	at reactor.core.publisher.BlockingOptionalMonoSubscriber.blockingGet(BlockingOptionalMonoSubscriber.java:121)
	at reactor.core.publisher.Mono.blockOptional(Mono.java:1752)
	at io.r2dbc.mssql.testcase.AbstractTest.runTest(AbstractTest.java:70)
	at io.r2dbc.mssql.testcase.AbstractTest.shouldNotHangUpWithSmallPool(AbstractTest.java:52)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
	at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
	at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
	at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:242)
	at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:137)
	at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:112)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:189)
	at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:165)
	at org.apache.maven.surefire.booter.ProviderFactory.invokeProvider(ProviderFactory.java:85)
	at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:115)
	at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:75)
	Suppressed: java.lang.Exception: #block terminated with an error
		at reactor.core.publisher.BlockingOptionalMonoSubscriber.blockingGet(BlockingOptionalMonoSubscriber.java:123)
		... 39 more

And sometime (though pretty rare) it fails with:

19:12:03.948 [reactor-tcp-epoll-3] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
io.netty.util.IllegalReferenceCountException: refCnt: 0
	at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
	at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1440)
	at io.netty.buffer.AbstractByteBuf.readByte(AbstractByteBuf.java:730)
	at io.netty.buffer.AbstractByteBuf.readUnsignedByte(AbstractByteBuf.java:744)
	at io.r2dbc.mssql.message.tds.Decode.uByte(Decode.java:49)
	at io.r2dbc.mssql.message.type.Length.decode(Length.java:137)
	at io.r2dbc.mssql.codec.AbstractCodec.decode(AbstractCodec.java:99)
	at io.r2dbc.mssql.codec.DefaultCodecs.doDecode(DefaultCodecs.java:196)
	at io.r2dbc.mssql.codec.DefaultCodecs.decode(DefaultCodecs.java:191)

There is the full log for IllegalReferenceCountException case.

I tested it with r2dbc-postgresql and also with using r2dbc-pool - there is no such bug with PostgreSQL and using of the pool only decreases probability of the bug.

Steps to reproduce

To reproduce the issue, please run mssql_tests.sh or mssql_snapshot_isolation_tests.sh from the attached maven project.

   .flatMap(v -> query(c))
   .flatMap(v -> query(c))

introduces concurrency to a single connection. Please ensure to not run queries concurrently and please avoid nested queries on the same connection since response processing is a stream and your application wanting to consume the next result while the previous result is not fully consumed can lock up the process entirely.

Looks like it's the case. Thank you for the fast reply!