r2dbc/r2dbc-mssql

Errors are not propagated

gideoncatz opened this issue · 4 comments

Bug Report

Versions

  • Driver: io.r2dbc:r2dbc-mssql:0.9.0.M2
  • Database: MSSQL19 (Using docker image mcr.microsoft.com/mssql/server:2019-latest
  • Java: 11
  • OS: MacOS 11.6 Bug Sur

Current Behavior

When an error takes place during a persistence operation (in my case - I tried to invoke a stored procedure that doesn't exist), it is still handled by the onNext hook of the ReactorNettyClient, instead of being handled by the onError hook. As a result, the error is only seen in the logs (on a debug level, for some reason), and it gets "swallowed" within ReactorNettyClient, instead of being propogated to the error hook defined in the applicaiton code.

![image](https://user-images.githubusercontent.com/11587731/149658692-2efa007b-ad30-4c34-bde4-3f02e2373073.png)

It can be seen that an ErrorToken has been received, stating that the stored procedure cannot be found. Yet it is processed by the onNext hook and the error is consumed there.

Steps to reproduce

Code
Mono.from(connectionFactory.create())
                .flatMap(connection -> {
                            connection.createStatement("EXEC sp_Nonextstent_Stored_Procedure")
                            .execute()
                            .subscribe(new Subscriber<MssqlResult>() {
                                @Override
                                public void onSubscribe(Subscription s) {
                                    log.debug("onSubscribe:" + s);
                                }

                                @Override
                                public void onNext(MssqlResult result) {
                                    log.debug("onNext:" + result);
                                    log.debug("Rows updated: {}", result.getRowsUpdated());
                                }

                                @Override
                                public void onError(Throwable t) {
                                    log.error("onError:", t);
                                }

                                @Override
                                public void onComplete() {
                                    log.debug("Completed");
                                }
                            });

Output:

2022-01-16 13:39:13 [reactor-tcp-nio-3] DEBUG io.r2dbc.mssql.client.ReactorNettyClient -[cid: 0x2] Request: SQLBatch [sql="EXEC sp_Nonextstent_Stored_Procedure"]
2022-01-16 13:39:22 [reactor-tcp-nio-3] DEBUG io.r2dbc.mssql.client.ReactorNettyClient -[cid: 0x2] Response: ErrorToken [number=2812, state=62, infoClass=16, message='Could not find stored procedure 'sp_Nonextstent_Stored_Procedure'.", serverName='sql19", procName='", lineNumber=16777216]
2022-01-16 13:39:22 [reactor-tcp-nio-3] DEBUG io.r2dbc.mssql.client.ReactorNettyClient -[cid: 0x2] Warning: Code [2812] Severity [GENERAL_ERROR]: Could not find stored procedure 'sp_Nonextstent_Stored_Procedure'.

Expected behavior/code

The onError hook should've been invoked. In practice, none of the client hooks are invoked. The error is consumed by the internal onNext hook, as shown in the screenshot.

Possible Solution

If the message is an instance of an ErrorToken, might it be possible to route it to the onError hook?

Thanks

Does the same happen with version 0.9.0.RELEASE?

It does. (I switched my r2dbc-spi to 0.9.0.RELEASE, accordingly)
Furthermore, even when the operation does succeed, neither the onNext nor the onComplete hooks are invoked, so I can't know that the persistence is successful either.
Is my code incorrect, or is indeed an issue in the driver?

Thanks for verifying. I now see what causes the issue: The code calls execute() without consuming results. The actual execution happens in a later stage when you subscribe to the Result methods. As soon as you e.g. flatMap(Result::getRowsUpdated).subscribe(…) you will see the correct signals. Statement.execute() returns a publisher of results that is a promise for execution. In R2DBC, results must be consumed, otherwise, they will block the transport channel when left unconsumed.

Thanks for the prompt response, Mark!
Subscribing on Result::getRowsUpdated works like a charm! :-)

I noticed a behavior that you might want to check, though.
The relevant code piece from my side looks like this:

                            .subscribe(statement -> statement
                                    .execute()
                                    .flatMap(Result::getRowsUpdated)
                                    .doOnError(t -> log.error("Error executing statement: ", t))
                                    .doOnComplete(() -> log.debug("Statement execution completed"))
                                    .subscribe(numRows -> {
                                        log.debug("Transaction successful, {} rows affected", numRows);
                                    }));

When the statement is a direct SQL operation (e.g. INSERT INTO...), the lambda in the subscribe is invoked only once, as expected. Yet in case it's an execution of a stored procedure, it is invoked twice, for some reason.
Is this a normal behavior?

@mp911de