BlobCodec and ClobCodec.encode(…) fails with IllegalStateException due to blocking
Opened this issue · 3 comments
Same case as with #127. To solve that issue, we need to defer the stream consumption until statement execution.
Reproducer:
Connection conn = …;
conn.createStatement("INSERT INTO PERSON (first_name) VALUES(?1)")
.bind(1, Blob.from(Mono.just(ByteBuffer.wrap("foo".getBytes()))))
.execute();
Code must be executed on a NonBlocking
Reactor Thread such as Schedulers.parallel()
to reproduce the issue.
java.lang.IllegalStateException: Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-2
at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:154) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Handler com.example.demo.DemoApplication$PersonController#create(String, String) [DispatcherHandler]
|_ checkpoint ⇢ HTTP POST "/create/Mark/Paluch" [ExceptionHandlingWebHandler]
Stack trace:
at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:154) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
at io.r2dbc.h2.codecs.ClobCodec$AggregateCharArrayReader.<init>(ClobCodec.java:90) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at io.r2dbc.h2.codecs.ClobCodec$AggregateCharArrayReader.<init>(ClobCodec.java:70) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at io.r2dbc.h2.codecs.ClobCodec.doEncode(ClobCodec.java:59) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at io.r2dbc.h2.codecs.ClobCodec.doEncode(ClobCodec.java:32) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at io.r2dbc.h2.codecs.AbstractCodec.encode(AbstractCodec.java:68) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at io.r2dbc.h2.codecs.DefaultCodecs.encode(DefaultCodecs.java:69) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at io.r2dbc.h2.H2Statement.addIndex(H2Statement.java:130) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at io.r2dbc.h2.H2Statement.bind(H2Statement.java:80) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at io.r2dbc.h2.H2Statement.bind(H2Statement.java:39) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
java.lang.IllegalStateException: Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-2
at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:154) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
|_ checkpoint ⇢ Handler com.example.demo.DemoApplication$PersonController#create(String, String) [DispatcherHandler]
|_ checkpoint ⇢ HTTP POST "/create/Mark/Paluch" [ExceptionHandlingWebHandler]
Stack trace:
at io.r2dbc.h2.codecs.BlobCodec$BlobInputStreamEnumeration.hasMoreElements(BlobCodec.java:86) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at java.io.SequenceInputStream.nextStream(SequenceInputStream.java:109) ~[na:1.8.0_211]
at java.io.SequenceInputStream.<init>(SequenceInputStream.java:69) ~[na:1.8.0_211]
at io.r2dbc.h2.codecs.BlobCodec.doEncode(BlobCodec.java:59) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at io.r2dbc.h2.codecs.BlobCodec.doEncode(BlobCodec.java:32) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at io.r2dbc.h2.codecs.AbstractCodec.encode(AbstractCodec.java:68) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at io.r2dbc.h2.codecs.DefaultCodecs.encode(DefaultCodecs.java:69) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at io.r2dbc.h2.H2Statement.addIndex(H2Statement.java:130) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at io.r2dbc.h2.H2Statement.bind(H2Statement.java:80) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
at io.r2dbc.h2.H2Statement.bind(H2Statement.java:39) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
The BlobToByteBufferCodec
also should be noted, looks like it use same way to blocking Flux
.
Fixing the reading side is probably quite straight-forward as H2 isn't backed by anything reactive. The complexity comes into play when trying to write values to H2. We need to resolve each parameter that provides a Publisher
through an additional concatMap
during Statement.execute(…)
.
Similar error occurs at
io.r2dbc.h2.codecs.BlobToByteBufferCodec.doEncode(BlobToByteBufferCodec.java:65)
when using Spring Security Oauth2 client configured with H2 and R2dbcReactiveOAuth2AuthorizedClientService
java.lang.IllegalStateException: Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-2
at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:161) ~[reactor-core-3.4.12.jar:3.4.12]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ org.springframework.security.oauth2.client.web.server.authentication.OAuth2LoginAuthenticationWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.oauth2.client.web.server.OAuth2AuthorizationRequestRedirectWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]
*__checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]
*__checkpoint ⇢ HTTP GET "/login/oauth2/code/registrationid?code=redacted&state=alsoredacted" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:161) ~[reactor-core-3.4.12.jar:3.4.12]
at io.r2dbc.h2.codecs.BlobToByteBufferCodec$BlobInputStreamEnumeration.hasMoreElements(BlobToByteBufferCodec.java:92) ~[r2dbc-h2-0.8.4.RELEASE.jar:0.8.4.RELEASE]
at java.base/java.io.SequenceInputStream.peekNextStream(SequenceInputStream.java:100) ~[na:na]
at java.base/java.io.SequenceInputStream.<init>(SequenceInputStream.java:67) ~[na:na]
at io.r2dbc.h2.codecs.BlobToByteBufferCodec.doEncode(BlobToByteBufferCodec.java:65) ~[r2dbc-h2-0.8.4.RELEASE.jar:0.8.4.RELEASE]