r2dbc/r2dbc-proxy

Cannot invoke "java.lang.Comparable.compareTo(Object)" because "a[runHi]" is null

Closed this issue · 6 comments

Bug Report

Versions

r2dbc-proxy 1.1.4.RELEASE and 1.1.5.RELEASE
postgresql: 1.14.3
OS: linux

Current Behavior

im using spring boot 3.2.5 and i have a service that has multiple monos inside of Mono.zip these monos call a repository methods thats a query that returns an object or returns Mono.error with an exception then in my service i do on error resume based on that exception to set some attributes
under load with a flux of 1000 monos i get randomly a null pointer exception that cancels my flux

i get this error in the console

Stacktrace
2024-11-05 17:30:02.845 [reactor-tcp-epoll-5] ERROR r.n.channel.ChannelOperationsHandler - [0876be15, L:/127.0.0.1:51598 - R:localhost/127.0.0.1:32890] Error was received while reading the incoming data. The connection will be closed. 
java.lang.NullPointerException: Cannot invoke "java.lang.Comparable.compareTo(Object)" because "a[runHi]" is null
	at java.base/java.util.ComparableTimSort.countRunAndMakeAscending(ComparableTimSort.java:320)
	at java.base/java.util.ComparableTimSort.sort(ComparableTimSort.java:188)
	at java.base/java.util.Arrays.sort(Arrays.java:1042)
	at io.micrometer.common.KeyValues.<init>(KeyValues.java:47)
	at io.micrometer.common.KeyValues.of(KeyValues.java:269)
	at io.micrometer.observation.Observation$Context.getHighCardinalityKeyValues(Observation.java:1216)
	at io.micrometer.observation.Observation$Context.getAllKeyValues(Observation.java:1232)
	at io.micrometer.tracing.handler.TracingObservationHandler.tagSpan(TracingObservationHandler.java:52)
	at io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler.onStop(PropagatingSenderTracingObservationHandler.java:101)
	at io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler.onStop(PropagatingSenderTracingObservationHandler.java:35)
	at io.micrometer.observation.ObservationHandler$FirstMatchingCompositeObservationHandler.onStop(ObservationHandler.java:197)
	at io.micrometer.observation.SimpleObservation.lambda$notifyOnObservationStopped$0(SimpleObservation.java:268)
	at java.base/java.util.ArrayDeque$DescendingIterator.forEachRemaining(ArrayDeque.java:771)
	at io.micrometer.observation.SimpleObservation.notifyOnObservationStopped(SimpleObservation.java:268)
	at io.micrometer.observation.SimpleObservation.stop(SimpleObservation.java:188)
	at io.r2dbc.proxy.observation.ObservationProxyExecutionListener.afterQuery(ObservationProxyExecutionListener.java:183)
	at io.r2dbc.proxy.listener.CompositeProxyExecutionListener.lambda$afterQuery$3(CompositeProxyExecutionListener.java:58)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at io.r2dbc.proxy.listener.CompositeProxyExecutionListener.afterQuery(CompositeProxyExecutionListener.java:58)
	at io.r2dbc.proxy.callback.AfterQueryCallbackInvoker.afterQuery(AfterQueryCallbackInvoker.java:55)
	at io.r2dbc.proxy.callback.ResultInvocationSubscriber.afterQuery(ResultInvocationSubscriber.java:162)
	at io.r2dbc.proxy.callback.ResultInvocationSubscriber.onComplete(ResultInvocationSubscriber.java:101)
	at io.r2dbc.proxy.callback.MethodInvocationSubscriber.onComplete(MethodInvocationSubscriber.java:94)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
	at io.r2dbc.proxy.callback.ResultInvocationSubscriber.onComplete(ResultInvocationSubscriber.java:104)
	at io.r2dbc.proxy.callback.MethodInvocationSubscriber.onComplete(MethodInvocationSubscriber.java:94)
	at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223)
	at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:261)
	at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:239)
	at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onComplete(FluxFilterFuseable.java:391)
	at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onComplete(FluxPeekFuseable.java:940)
	at io.r2dbc.postgresql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onComplete(FluxDiscardOnCancel.java:104)
	at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:223)
	at reactor.core.publisher.FluxCreate$BaseSink.complete(FluxCreate.java:465)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:871)
	at reactor.core.publisher.FluxCreate$BufferAsyncSink.complete(FluxCreate.java:819)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.drainLoop(FluxCreate.java:249)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.drain(FluxCreate.java:215)
	at reactor.core.publisher.FluxCreate$SerializedFluxSink.complete(FluxCreate.java:206)
	at io.r2dbc.postgresql.client.ReactorNettyClient$Conversation.complete(ReactorNettyClient.java:668)
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.emit(ReactorNettyClient.java:934)
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:810)
	at io.r2dbc.postgresql.client.ReactorNettyClient$BackendMessageSubscriber.onNext(ReactorNettyClient.java:716)
	at reactor.core.publisher.FluxHandle$HandleSubscriber.onNext(FluxHandle.java:129)
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onNext(FluxMap.java:224)
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:294)
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:403)
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:426)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:801)
	at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:501)
	at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:399)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:1583)

Table schema

Input Code
SELECT * from my_table where column_1 = :value1 and column_2 = :value2 and date <= :date order by id desc Limit 1;

Expected behavior/code

I expect the flux to finish successfully

Possible Solution

as a workaround for now we set r2dbc.observation.enabled=false to stop the observation

Additional context

When debugging the KeyValues map i see that it has the size 2 but it has only one element with the key r2dbc.query[0]
it looks like concurrency issue but im not sure.
Here in the screen shot the key value object in io.micrometer.common.KeyValues method the parameter passed is still being initialized in the SimpleObservation class stop method which calls the convention.getHighCardinalityKeyValues(context) method related to io.r2dbc.proxy.observation.QueryObservationConvention
Untitled design (1)

@bilelncq Thanks for the report.

I'm wondering does each of zipped elements share database connection or does each get different connection?

hello @ttddyy thank you for your reply,
i can confirm that with spring.r2dbc.pool.max-size=1 the issue is not reproduced but with spring.r2dbc.pool.max-size=2 it occurs.

Hm, that is strange.
Can you try with the latest version of the r2dbc-pool?

Also, please make sure the r2dbc-proxy Connection is the outer-most Connection.
It should be: "r2dbc-proxy Connection" (outer) -> "r2dbc-pool Connection" -> "driver Connection" (inner)

Sorry for the late reply i didn't have time to recheck, but in my test im using the r2dbc spring boot starter which has the default configuration and i have another implementation with my own connection factory implementation since we have multi tenancy and the issue occurs in both implementations.
I tried the latest r2dbc-pool version 1.0.2.RELEASE and the issue still occurs

Most likely this is the same issue as micrometer-metrics/micrometer#4356

Hello @ttddyy i can confirm that the new version 1.14.2 of micrometer fixed the issue, thank you for your help.