io.netty.util.IllegalReferenceCountException when retrieving large query result
nico89ar opened this issue · 1 comments
r2dbc-mariadb version: 1.1.2
r2dbc-pool: 0.9.2
spring boot version: 2.7.5
netty version: 4.1.84
jooq version: 3.16.4
I have a server running with Spring Boot where I'm fetching data from a table using jOOQ and the the MariaDB R2DBC connector. The query result is retrieved as a kotlin flow which is processed and streamed to the client browser as an application/octet-stream to download a file. This works fine in most cases, but when the query result is very large, after some time the download is interrupted with the following exception:
io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
...
Original Stack Trace:
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1454)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1440)
at io.netty.buffer.AbstractByteBuf.readByte(AbstractByteBuf.java:730)
at io.netty.buffer.AbstractByteBuf.readUnsignedByte(AbstractByteBuf.java:744)
at org.mariadb.r2dbc.codec.TextRowDecoder.setPosition(TextRowDecoder.java:82)
at org.mariadb.r2dbc.codec.TextRowDecoder.get(TextRowDecoder.java:23)
at org.mariadb.r2dbc.MariadbReadable.get(MariadbReadable.java:33)
at org.jooq.impl.R2DBC$R2DBCResultSet$DefaultRow.get(R2DBC.java:1215)
at org.jooq.impl.R2DBC$R2DBCResultSet.nullable(R2DBC.java:1090)
at org.jooq.impl.R2DBC$R2DBCResultSet.nullable(R2DBC.java:1086)
at org.jooq.impl.R2DBC$R2DBCResultSet.getString(R2DBC.java:1151)
at org.jooq.impl.DefaultBinding$DefaultStringBinding.get0(DefaultBinding.java:4266)
at org.jooq.impl.DefaultBinding$DefaultStringBinding.get0(DefaultBinding.java:4212)
at org.jooq.impl.DefaultBinding$InternalBinding.get(DefaultBinding.java:1025)
at org.jooq.impl.R2DBC$ResultSubscriber.lambda$onNext$1(R2DBC.java:345)
at org.jooq.impl.RecordDelegate.operate(RecordDelegate.java:144)
at org.jooq.impl.R2DBC$ResultSubscriber.lambda$onNext$2(R2DBC.java:332)
at org.mariadb.r2dbc.client.MariadbResult.lambda$map$1(MariadbResult.java:123)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122)
at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113)
at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onNext(FluxHandle.java:342)
at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.onNext(FluxTakeUntil.java:84)
at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drainRegular(FluxWindowPredicate.java:670)
at reactor.core.publisher.FluxWindowPredicate$WindowFlux.drain(FluxWindowPredicate.java:748)
at reactor.core.publisher.FluxWindowPredicate$WindowFlux.request(FluxWindowPredicate.java:835)
at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.request(FluxTakeUntil.java:141)
at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.request(FluxHandle.java:477)
at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186)
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138)
at org.jooq.impl.R2DBC$AbstractNonBlockingSubscription.request2(R2DBC.java:599)
at org.jooq.impl.R2DBC$AbstractNonBlockingSubscription.forAllForwardingSubscriptions(R2DBC.java:589)
at org.jooq.impl.R2DBC$AbstractNonBlockingSubscription.request1(R2DBC.java:594)
at org.jooq.impl.R2DBC$AbstractNonBlockingSubscription.request0(R2DBC.java:578)
at org.jooq.impl.ThreadGuard.lambda$run$0(ThreadGuard.java:74)
at org.jooq.impl.ThreadGuard.run(ThreadGuard.java:87)
at org.jooq.impl.ThreadGuard.run(ThreadGuard.java:74)
at org.jooq.impl.R2DBC$AbstractSubscription.request(R2DBC.java:182)
at kotlinx.coroutines.reactive.ReactiveSubscriber.makeRequest(ReactiveFlow.kt:153)
at kotlinx.coroutines.reactive.PublisherAsFlow.collectImpl(ReactiveFlow.kt:103)
at kotlinx.coroutines.reactive.PublisherAsFlow.access$collectImpl(ReactiveFlow.kt:48)
at kotlinx.coroutines.reactive.PublisherAsFlow$collectImpl$1.invokeSuspend(ReactiveFlow.kt)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.flow.internal.SafeCollector.invokeSuspend(SafeCollector.kt:48)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.flow.internal.SafeCollector.invokeSuspend(SafeCollector.kt:48)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.flow.internal.StackFrameContinuation.resumeWith(ChannelFlow.kt:240)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
at kotlinx.coroutines.DispatchedTaskKt.resumeUnconfined(DispatchedTask.kt:190)
at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:161)
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
at kotlinx.coroutines.CancellableContinuationImpl.resumeWith(CancellableContinuationImpl.kt:328)
at kotlinx.coroutines.reactive.FlowSubscription.request(ReactiveFlow.kt:267)
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:272)
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164)
at reactor.netty.channel.MonoSendMany$SendManyInner.run(MonoSendMany.java:360)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:403)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Unknown Source)
Looking up this exceptions suggests that this could be caused by a misuse of io.netty.buffer.AbstractByteBuf
, which is trying to be accessed after being released. Although it always happens for large query results, given the same data set it can occur randomly any time from 3 seconds up to around 30 seconds once the streaming starts.
Here is a very simplified snippet of how I'm fetching the data:
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import org.jooq.DSLContext
import org.jooq.Record
class MyRepository(private val jooq: DSLContext) {
fun fetch(): Flow<Record> {
return jooq.select()
.from(MY_TABLE)
.asFlow()
}
}
Could this be an issue with the TextRowDecoder
?
Update: This seems to have been fixed after the release of spring boot 3, which in turn allowed for upgrading other dependencies:
r2dbc-mariadb version: 1.1.2
r2dbc-pool: 1.0.0
spring boot version: 3.0.0
netty version: 4.1.85
jooq version: 3.17.4