neo4j/neo4j-ogm-quarkus

Feature Request: Support quarkus-neo4j AsyncSession

Closed this issue · 2 comments

Currently, to integrate Mutiny into neo4j-ogm quarkus, some boilerplate code is required, since there is no async api exposed, alltough the java driver from quarkus offers support for that.

For example, to query item:

return vertx.executeBlocking(Uni.createFrom().item(() -> {
            Session session = sessionFactory.openSession();
            return session.query(MyNode.class, cypher, params);
        })).onItem().transformToMulti(nodes -> {
            return Multi.createFrom().iterable(nodes);
        });

Here, executeBlocking needs to be used since the call is blocking.

The Java driver permits several async sessions to be created:

/**
 * Create a new session of supported type with a specified {@link SessionConfig session configuration}.
 * <p>
 * Supported types are:
 * <ul>
 *     <li>{@link org.neo4j.driver.Session} - synchronous session</li>
 *     <li>{@link org.neo4j.driver.async.AsyncSession} - asynchronous session</li>
 *     <li>{@link org.neo4j.driver.reactive.ReactiveSession} - reactive session using Flow API</li>
 *     <li>{@link org.neo4j.driver.reactivestreams.ReactiveSession} - reactive session using Reactive Streams
 * API</li>
 *     <li>{@link org.neo4j.driver.reactive.RxSession} - deprecated reactive session using Reactive Streams
 * API, superseded by {@link org.neo4j.driver.reactivestreams.ReactiveSession}</li>
 * </ul>
 * <p>
 * Sample usage:
 * <pre>
 * {@code
 * var session = driver.session(AsyncSession.class);
 * }
 * </pre>
 *
 * @param sessionClass session type class, must not be null
 * @param sessionConfig session config, must not be null
 * @return session instance
 * @param <T> session type
 * @throws IllegalArgumentException for unsupported session types
 * @since 5.2
 */
<T extends BaseSession> T session(Class<T> sessionClass, SessionConfig sessionConfig);

I suppose the code then boils down to something like:

return Multi.createFrom().publisher(
    session.query(MyNode.class, cypher, params)
);

Originally i stumbled upon this issue while executing a Quarkus test case which spins up a testcontainer image of Neo4j. Here, a scheduler queries entities periodically (but not related to the acual test, its just spinned up with the application), but the instruction is send to shut down the driver an exception occurs. Maybe internal exception handling is not correct?

2023-05-17 13:05:57,668 INFO  [org.neo.ogm.dri.bol.dri.BoltDriver] (main) Shutting down Bolt driver org.neo4j.driver.internal.InternalDriver@39ad5a7d 
2023-05-17 13:05:57,692 ERROR [io.net.uti.con.Def.rejectedExecution] (executor-thread-1) Failed to submit a listener notification task. Event loop shut down? [Error Occurred After Shutdown]: java.util.concurrent.RejectedExecutionException: event executor terminated
	at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:934)
	at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:351)
	at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:344)
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:836)
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute0(SingleThreadEventExecutor.java:827)
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:817)
	at io.netty.util.concurrent.DefaultPromise.safeExecute(DefaultPromise.java:862)
	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:500)
	at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
	at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
	at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:118)
	at io.netty.channel.pool.SimpleChannelPool.closeAndFail(SimpleChannelPool.java:375)
	at io.netty.channel.pool.SimpleChannelPool.release(SimpleChannelPool.java:292)
	at io.netty.channel.pool.FixedChannelPool.release(FixedChannelPool.java:294)
	at io.netty.channel.pool.SimpleChannelPool.release(SimpleChannelPool.java:272)
	at org.neo4j.driver.internal.async.pool.NettyChannelPool.release(NettyChannelPool.java:114)
	at org.neo4j.driver.internal.async.NetworkConnection.lambda$terminateAndRelease$1(NetworkConnection.java:174)
	at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187)
	at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2341)
	at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:144)
	at org.neo4j.driver.internal.async.NetworkConnection.terminateAndRelease(NetworkConnection.java:174)
	at org.neo4j.driver.internal.async.connection.DirectConnection.terminateAndRelease(DirectConnection.java:98)
	at org.neo4j.driver.internal.InternalTransaction.terminateConnectionOnThreadInterrupt(InternalTransaction.java:70)
	at org.neo4j.driver.internal.InternalTransaction.lambda$run$3(InternalTransaction.java:60)
	at org.neo4j.driver.internal.util.Futures.safeRun(Futures.java:240)
	at org.neo4j.driver.internal.util.Futures.blockingGet(Futures.java:109)
	at org.neo4j.driver.internal.InternalTransaction.run(InternalTransaction.java:58)
	at org.neo4j.driver.internal.AbstractQueryRunner.run(AbstractQueryRunner.java:34)
	at org.neo4j.driver.internal.AbstractQueryRunner.run(AbstractQueryRunner.java:39)
	at org.neo4j.ogm.drivers.bolt.request.BoltRequest.executeRequest(BoltRequest.java:181)
	at org.neo4j.ogm.drivers.bolt.request.BoltRequest.execute(BoltRequest.java:85)
	at org.neo4j.ogm.session.delegates.ExecuteQueriesDelegate.lambda$executeAndMap$2(ExecuteQueriesDelegate.java:198)
	at org.neo4j.ogm.session.Neo4jSession.doInTransaction(Neo4jSession.java:600)
	at org.neo4j.ogm.session.Neo4jSession.doInTransaction(Neo4jSession.java:574)
	at org.neo4j.ogm.session.delegates.ExecuteQueriesDelegate.executeAndMap(ExecuteQueriesDelegate.java:186)
	at org.neo4j.ogm.session.delegates.ExecuteQueriesDelegate.query(ExecuteQueriesDelegate.java:128)
	at org.neo4j.ogm.session.Neo4jSession.query(Neo4jSession.java:430)

Hoping for clarification and or discussion about the issues.
If any information is missing, please comment and ill try to provide it.

Just verified with the most recent versions of quarkus-neo4j 3.0.3 and neo4j-ogm-quarkus 3.0.0.

Hey @glains sorry for the late response, I was travelling and swamped.

Two things / answers: I am well aware of the session types the driver provides, but we have right now no intends to rewrite OGM to make use oft Async sessions. Things might change, but the main emphasis from the company is on SDN and if you want reactive, SDN has it. I know that this is not Quarkus compatible, but that's how it is.

The other answer is a question: Seems like your scheduler is terminated after Quarkus-Neo4j integration closes the driver. Can that be? Might share some code?