mariadb-corporation/mariadb-connector-r2dbc

io.netty.util.IllegalReferenceCountException when retrieving large query result

nico89ar opened this issue · 1 comments

r2dbc-mariadb version: 1.1.2
r2dbc-pool: 0.9.2
spring boot version: 2.7.5
netty version: 4.1.84
jooq version: 3.16.4

I have a server running with Spring Boot where I'm fetching data from a table using jOOQ and the the MariaDB R2DBC connector. The query result is retrieved as a kotlin flow which is processed and streamed to the client browser as an application/octet-stream to download a file. This works fine in most cases, but when the query result is very large, after some time the download is interrupted with the following exception:

io.netty.util.IllegalReferenceCountException: refCnt: 0
	at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
...
Original Stack Trace:
		at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
		at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1440)
		at io.netty.buffer.AbstractByteBuf.readByte(AbstractByteBuf.java:730)
		at io.netty.buffer.AbstractByteBuf.readUnsignedByte(AbstractByteBuf.java:744)
		at org.mariadb.r2dbc.codec.TextRowDecoder.setPosition(TextRowDecoder.java:82)
		at org.mariadb.r2dbc.codec.TextRowDecoder.get(TextRowDecoder.java:23)
		at org.mariadb.r2dbc.MariadbReadable.get(MariadbReadable.java:33)
		at org.jooq.impl.R2DBC$R2DBCResultSet$DefaultRow.get(R2DBC.java:1215)
		at org.jooq.impl.R2DBC$R2DBCResultSet.nullable(R2DBC.java:1090)
		at org.jooq.impl.R2DBC$R2DBCResultSet.nullable(R2DBC.java:1086)
		at org.jooq.impl.R2DBC$R2DBCResultSet.getString(R2DBC.java:1151)
		at org.jooq.impl.DefaultBinding$DefaultStringBinding.get0(DefaultBinding.java:4266)
		at org.jooq.impl.DefaultBinding$DefaultStringBinding.get0(DefaultBinding.java:4212)
		at org.jooq.impl.DefaultBinding$InternalBinding.get(DefaultBinding.java:1025)
		at org.jooq.impl.R2DBC$ResultSubscriber.lambda$onNext$1(R2DBC.java:345)
		at org.jooq.impl.RecordDelegate.operate(RecordDelegate.java:144)
		at org.jooq.impl.R2DBC$ResultSubscriber.lambda$onNext$2(R2DBC.java:332)
		at org.mariadb.r2dbc.client.MariadbResult.lambda$map$1(MariadbResult.java:123)
		at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
		at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
		at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113)
		at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onNext(FluxHandle.java:342)
		at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.onNext(FluxTakeUntil.java:84)
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:670)
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:748)
		at reactor.core.publisher.FluxWindowPredicate$WindowFlux.request(FluxWindowPredicate.java:835)
		at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.request(FluxTakeUntil.java:141)
		at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.request(FluxHandle.java:477)
		at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186)
		at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
		at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
		at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138)
		at org.jooq.impl.R2DBC$AbstractNonBlockingSubscription.request2(R2DBC.java:599)
		at org.jooq.impl.R2DBC$AbstractNonBlockingSubscription.forAllForwardingSubscriptions(R2DBC.java:589)
		at org.jooq.impl.R2DBC$AbstractNonBlockingSubscription.request1(R2DBC.java:594)
		at org.jooq.impl.R2DBC$AbstractNonBlockingSubscription.request0(R2DBC.java:578)
		at org.jooq.impl.ThreadGuard.lambda$run$0(ThreadGuard.java:74)
		at org.jooq.impl.ThreadGuard.run(ThreadGuard.java:87)
		at org.jooq.impl.ThreadGuard.run(ThreadGuard.java:74)
		at org.jooq.impl.R2DBC$AbstractSubscription.request(R2DBC.java:182)
		at kotlinx.coroutines.reactive.ReactiveSubscriber.makeRequest(ReactiveFlow.kt:153)
		at kotlinx.coroutines.reactive.PublisherAsFlow.collectImpl(ReactiveFlow.kt:103)
		at kotlinx.coroutines.reactive.PublisherAsFlow.access$collectImpl(ReactiveFlow.kt:48)
		at kotlinx.coroutines.reactive.PublisherAsFlow$collectImpl$1.invokeSuspend(ReactiveFlow.kt)
		at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
		at kotlinx.coroutines.flow.internal.SafeCollector.invokeSuspend(SafeCollector.kt:48)
		at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
		at kotlinx.coroutines.flow.internal.SafeCollector.invokeSuspend(SafeCollector.kt:48)
		at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
		at kotlinx.coroutines.flow.internal.StackFrameContinuation.resumeWith(ChannelFlow.kt:240)
		at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
		at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
		at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:190)
		at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:161)
		at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
		at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
		at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
		at kotlinx.coroutines.CancellableContinuationImpl.resumeWith(CancellableContinuationImpl.kt:328)
		at kotlinx.coroutines.reactive.FlowSubscription.request(ReactiveFlow.kt:267)
		at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
		at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:272)
		at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
		at reactor.netty.channel.MonoSendMany$SendManyInner.run(MonoSendMany.java:360)
		at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
		at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
		at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
		at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.base/java.lang.Thread.run(Unknown Source)

Looking up this exceptions suggests that this could be caused by a misuse of io.netty.buffer.AbstractByteBuf, which is trying to be accessed after being released. Although it always happens for large query results, given the same data set it can occur randomly any time from 3 seconds up to around 30 seconds once the streaming starts.

Here is a very simplified snippet of how I'm fetching the data:

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import org.jooq.DSLContext
import org.jooq.Record

class MyRepository(private val jooq: DSLContext) {
    fun fetch(): Flow<Record> {
        return jooq.select()
            .from(MY_TABLE)
            .asFlow()
    }
}

Could this be an issue with the TextRowDecoder?

Update: This seems to have been fixed after the release of spring boot 3, which in turn allowed for upgrading other dependencies:

r2dbc-mariadb version: 1.1.2
r2dbc-pool: 1.0.0
spring boot version: 3.0.0
netty version: 4.1.85
jooq version: 3.17.4