oracle/oracle-r2dbc

NullPointer when using R2DBC Pool 0.9.0.RELEASE with version 0.4.0

sgtcortez opened this issue · 9 comments

Hi, I am facing a weird problem using:

Java Version: Oracle JDK 17

implementation 'io.r2dbc:r2dbc-pool:0.9.0.RELEASE'
implementation 'io.r2dbc:r2dbc-spi:0.9.0.RELEASE'
runtimeOnly 'com.oracle.database.r2dbc:oracle-r2dbc:0.4.0'

First of all, I was using the version 0.1.0 with R2DBC POOL, and, it works when executing a single statement, but, with paralell calls, I face the problem with: Multiple subscribers ...

So, I came here, and, read that thats a know problem with version 0.1.0. Then, I upgrade to the version 0.4.0 which, is the latest.
But, trying to execute the same query(which works with version 0.1.0) , but, I am receiving a NullPointer inside oracle r2dbc classes.

StackTrace:

java.lang.NullPointerException: Cannot invoke "java.util.ArrayDeque.size()" because "this.implicitResultSetStatements" is null
	at oracle.jdbc.driver.OracleStatement.getMoreResults(OracleStatement.java:5851)
	at oracle.jdbc.driver.OracleStatementWrapper.getMoreResults(OracleStatementWrapper.java:298)
	at oracle.r2dbc.impl.OracleStatementImpl$JdbcStatement.lambda$getResults$4(OracleStatementImpl.java:1053)
	at oracle.r2dbc.impl.AsyncLock.lambda$get$2(AsyncLock.java:161)
	at oracle.r2dbc.impl.AsyncLock.unlock(AsyncLock.java:122)
	at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.terminate(AsyncLock.java:510)
	at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.onComplete(AsyncLock.java:496)
	at reactor.core.publisher.StrictSubscriber.onComplete(StrictSubscriber.java:123)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
	at org.reactivestreams.FlowAdapters$FlowToReactiveSubscriber.onComplete(FlowAdapters.java:228)
	at oracle.jdbc.internal.CompletionStageUtil$IteratorSubscription.emitComplete(CompletionStageUtil.java:681)
	at oracle.jdbc.internal.CompletionStageUtil$IteratorSubscription.emitItems(CompletionStageUtil.java:628)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$10(PhysicalConnection.java:11713)
	at java.base/java.security.AccessController.doPrivileged(AccessController.java:399)
	at oracle.jdbc.driver.PhysicalConnection.lambda$createUserCodeExecutor$11(PhysicalConnection.java:11711)
	at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1395)
	at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:373)
	at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1182)
	at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1655)
	at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1622)
	at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:165)

How I execute the query:

return Mono.usingWhen(
        connectionPool.create(),
        connection -> Mono.from(connection.createStatement(QUERY)
                .bind(0, destinationState)
                .bind(1, subsidiaryId)
                .bind(2, itemId)
                .execute()
        ),
        Connection::close,
        ((connection, throwable) -> connection.close()),
        Connection::close
)
        .flatMapMany(it -> it.map(mapper))
        .next();

How I create the connection pool:

public ConnectionPool connectionFactory() {
    return new ConnectionPool(ConnectionPoolConfiguration
            .builder()
            .connectionFactory(ConnectionFactories.get(
                    ConnectionFactoryOptions
                            .builder()
                            .from(ConnectionFactoryOptions.parse(url))
                            .option(ConnectionFactoryOptions.USER, user)
                            .option(ConnectionFactoryOptions.PASSWORD, password)
                            .option(ConnectionFactoryOptions.DRIVER, DRIVER)
                            .option(Option.valueOf("applicationName"), "catalog-service-app")
                            .build()
                    )
            )
            .initialSize(INITIAL_CONNECTIONS)
            .maxSize(maxConnections)
            .maxIdleTime(Duration.ofSeconds(maxIdleTime))
            .validationQuery(VALIDATION_QUERY)
            .build()
    );
}

I thought that I would be a problem with dependency versions, but, I checked the dependencies and, I am using the correct ones.
Please, tell me where I am making a mistake

Dependencies:
image

By the way, even this simple statement throws the same null pointer

    Mono.from(connectionPool.create())
            .flatMapMany(connection ->
                    Flux.from(connection.createStatement("SELECT 1 FROM dual").execute())
                            .flatMap(result ->
                                    result.map((row, metadata) -> row.get(0, Integer.class)))
            ).subscribe(s -> System.out.println("Value: " + s ));
    return Mono.empty();

I will make a test with java 11. I am not sure if oracle jdbc already supports java 17

Thanks for all these details, @sgtcortez. This NPE looks like the same issue we saw here: #63
Make sure you have the 21.3.0.0 version of Oracle JDBC on the classpath. The 21.1 version has a bug in Statement.getMoreResults() that will trigger an NPE.

Thank you @Michael-A-McMahon. I will make this test, and, I will come back here with the result.

Hi, @Michael-A-McMahon I tested, and, now I am facing another problem ...

oracle.r2dbc.impl.OracleR2dbcExceptions$OracleR2dbcException: Closed Statement: getStatement
	at oracle.r2dbc.impl.OracleR2dbcExceptions.toR2dbcException(OracleR2dbcExceptions.java:217)
	at oracle.r2dbc.impl.OracleR2dbcExceptions.fromJdbc(OracleR2dbcExceptions.java:282)
	at oracle.r2dbc.impl.OracleReactiveJdbcAdapter.publishRows(OracleReactiveJdbcAdapter.java:747)
	at oracle.r2dbc.impl.OracleResultImpl$ResultSetResult.publishSegments(OracleResultImpl.java:479)
	at oracle.r2dbc.impl.OracleResultImpl.publishSegments(OracleResultImpl.java:150)
	at oracle.r2dbc.impl.OracleResultImpl.map(OracleResultImpl.java:219)
	at br.com.dimed.catalogservice.repository.TaxRuleRepository.lambda$fetchTaxRule$6(TaxRuleRepository.java:116)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.onNext(MonoFlatMapMany.java:163)
	at reactor.core.publisher.MonoUsingWhen$MonoUsingWhenSubscriber.deferredComplete(MonoUsingWhen.java:278)
	at reactor.core.publisher.FluxUsingWhen$CommitInner.onComplete(FluxUsingWhen.java:540)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
	at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2058)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
	at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:209)
	at reactor.pool.SimpleDequePool.lambda$maybeRecycleAndDrain$19(SimpleDequePool.java:513)
	at reactor.core.publisher.LambdaMonoSubscriber.onComplete(LambdaMonoSubscriber.java:135)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:145)
	at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
	at reactor.core.publisher.MonoNext$NextSubscriber.onComplete(MonoNext.java:102)
	at oracle.r2dbc.impl.AsyncLock$UsingConnectionSubscriber.onComplete(AsyncLock.java:497)
  // This one works
    Mono.from(connectionPool.create())
            .flatMapMany(connection ->
                    Flux.from(connection.createStatement("SELECT 1 FROM dual").execute())
                            .flatMap(result ->
                                    result.map((row, metadata) -> row.get(0, Integer.class)))
            ).subscribe(s -> System.out.println("Value: " + s ));

   // This one does not work
    return Mono.usingWhen(
            connectionPool.create(),
            connection -> Mono.from(connection.createStatement(QUERY)
                    .bind(0, destinationState)
                    .bind(1, subsidiaryId)
                    .bind(2, itemId)
                    .bind(3, destinationState)
                    .bind(4, itemId)
                    .bind(5, destinationState)
                    .bind(6, itemId)
                    .execute()
            ),
            Connection::close,
            ((connection, throwable) -> connection.close()),
            Connection::close
    )
            .flatMapMany(it -> it.map(mapper))
            .next();

I am not sure, but, might call close before create.

Dependencies:

implementation 'io.r2dbc:r2dbc-pool:0.9.0.RELEASE'
implementation 'io.r2dbc:r2dbc-spi:0.9.0.RELEASE'
implementation 'com.oracle.database.jdbc:ojdbc11:21.3.0.0'
runtimeOnly 'com.oracle.database.r2dbc:oracle-r2dbc:0.4.0'

We have:

            .flatMapMany(it -> it.map(mapper))

And it is downstream of the usingWhen publisher, so the Connection::close happens before the invocation of map on the Result.
For Oracle R2DBC, a Result is not longer valid after the connection is closed.

You could try moving the flatMapMany operator into the connection using function, like this:

...
.execute()
.flatMapMany(it -> it.map(mapper))

The main thing to understand is that anything inside of the connection -> using lambda will happen before the Connection::close. It is within this scope that you want to get all your database calls done. If you know JDBC, then you can think of this scope to be the equivalent of:

try (Connection connection = dataSource.getConnection()) {
... Connection, and any Statements or ResultSets it creates are only valid inside of this try block
}

@Michael-A-McMahon thank you for your help.
This was the first time using r2dbc with oracle, I had used only postgres, and the block of code above always worked for me ...

With your help, I changed the code, and now, It works !
My new code:

public Mono<TaxRuleDto> fetchTaxRule(
        final long itemId,
        final long subsidiaryId,
        final String destinationState) {

    return Flux.usingWhen(
            connectionPool.create(),
            connection -> Mono.from(connection.createStatement(QUERY)
                    .bind(0, destinationState)
                    .bind(1, subsidiaryId)
                    .bind(2, itemId)
                    .bind(3, destinationState)
                    .bind(4, itemId)
                    .bind(5, destinationState)
                    .bind(6, itemId)
                    .execute()
            ).flatMapMany(it -> it.map(mapper)),
            Connection::close,
            ((connection, throwable) -> connection.close()),
            Connection::close
    ).next();

This query can return only one row, but, I could make it work with Mono, so, for now, It is fine!

Again, thanks for your time & help.

Hi All,
The solution doesnt seem to be working when using Spring Data r2DBC. Using the 21.3.0.0 driver, the connection doesnt close. Please do not say that its an issue from Spring Data. I started this issue where connection pool doesnt work for Oracle R2DBC and ended up here. Could someone tell whats the solution if we use Spring reactive repositories + Transaction + Pooling + Oracle. I have gone through all the solution and finally stumbled upon this page. It would be highly appreciated if I could get some help.

Hi @vedhavi. Thanks for bringing this to my attention.
Can you give me a bit more details about this? In which case do you see a connection not being closed? If you can share a bit a code that reproduces the issue, that would be excellent.