r2dbc/r2dbc-h2

BlobCodec and ClobCodec.encode(…) fails with IllegalStateException due to blocking

Opened this issue · 3 comments

Same case as with #127. To solve that issue, we need to defer the stream consumption until statement execution.

Reproducer:

Connection conn = …;

conn.createStatement("INSERT INTO PERSON (first_name) VALUES(?1)")
		.bind(1, Blob.from(Mono.just(ByteBuffer.wrap("foo".getBytes()))))
		.execute();

Code must be executed on a NonBlocking Reactor Thread such as Schedulers.parallel() to reproduce the issue.

java.lang.IllegalStateException: Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-2
	at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:154) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	|_ checkpoint ⇢ Handler com.example.demo.DemoApplication$PersonController#create(String, String) [DispatcherHandler]
	|_ checkpoint ⇢ HTTP POST "/create/Mark/Paluch" [ExceptionHandlingWebHandler]
Stack trace:
		at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:154) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
		at io.r2dbc.h2.codecs.ClobCodec$AggregateCharArrayReader.<init>(ClobCodec.java:90) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at io.r2dbc.h2.codecs.ClobCodec$AggregateCharArrayReader.<init>(ClobCodec.java:70) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at io.r2dbc.h2.codecs.ClobCodec.doEncode(ClobCodec.java:59) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at io.r2dbc.h2.codecs.ClobCodec.doEncode(ClobCodec.java:32) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at io.r2dbc.h2.codecs.AbstractCodec.encode(AbstractCodec.java:68) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at io.r2dbc.h2.codecs.DefaultCodecs.encode(DefaultCodecs.java:69) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at io.r2dbc.h2.H2Statement.addIndex(H2Statement.java:130) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at io.r2dbc.h2.H2Statement.bind(H2Statement.java:80) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at io.r2dbc.h2.H2Statement.bind(H2Statement.java:39) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]

java.lang.IllegalStateException: Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-2
	at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:154) ~[reactor-core-3.3.0.RELEASE.jar:3.3.0.RELEASE]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	|_ checkpoint ⇢ Handler com.example.demo.DemoApplication$PersonController#create(String, String) [DispatcherHandler]
	|_ checkpoint ⇢ HTTP POST "/create/Mark/Paluch" [ExceptionHandlingWebHandler]
Stack trace:
		at io.r2dbc.h2.codecs.BlobCodec$BlobInputStreamEnumeration.hasMoreElements(BlobCodec.java:86) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at java.io.SequenceInputStream.nextStream(SequenceInputStream.java:109) ~[na:1.8.0_211]
		at java.io.SequenceInputStream.<init>(SequenceInputStream.java:69) ~[na:1.8.0_211]
		at io.r2dbc.h2.codecs.BlobCodec.doEncode(BlobCodec.java:59) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at io.r2dbc.h2.codecs.BlobCodec.doEncode(BlobCodec.java:32) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at io.r2dbc.h2.codecs.AbstractCodec.encode(AbstractCodec.java:68) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at io.r2dbc.h2.codecs.DefaultCodecs.encode(DefaultCodecs.java:69) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at io.r2dbc.h2.H2Statement.addIndex(H2Statement.java:130) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at io.r2dbc.h2.H2Statement.bind(H2Statement.java:80) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]
		at io.r2dbc.h2.H2Statement.bind(H2Statement.java:39) ~[r2dbc-h2-0.8.1.BUILD-SNAPSHOT.jar:0.8.1.BUILD-SNAPSHOT]

The BlobToByteBufferCodec also should be noted, looks like it use same way to blocking Flux.

Fixing the reading side is probably quite straight-forward as H2 isn't backed by anything reactive. The complexity comes into play when trying to write values to H2. We need to resolve each parameter that provides a Publisher through an additional concatMap during Statement.execute(…).

Similar error occurs at

io.r2dbc.h2.codecs.BlobToByteBufferCodec.doEncode(BlobToByteBufferCodec.java:65)

when using Spring Security Oauth2 client configured with H2 and R2dbcReactiveOAuth2AuthorizedClientService

java.lang.IllegalStateException: Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-2
	at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:161) ~[reactor-core-3.4.12.jar:3.4.12]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ org.springframework.security.oauth2.client.web.server.authentication.OAuth2LoginAuthenticationWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ org.springframework.security.oauth2.client.web.server.OAuth2AuthorizationRequestRedirectWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ org.springframework.security.web.server.context.ReactorContextWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ org.springframework.security.web.server.header.HttpHeaderWriterWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ org.springframework.security.config.web.server.ServerHttpSecurity$ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]
	*__checkpoint ⇢ HTTP GET "/login/oauth2/code/registrationid?code=redacted&state=alsoredacted" [ExceptionHandlingWebHandler]
Original Stack Trace:
		at reactor.core.publisher.BlockingIterable$SubscriberIterator.hasNext(BlockingIterable.java:161) ~[reactor-core-3.4.12.jar:3.4.12]
		at io.r2dbc.h2.codecs.BlobToByteBufferCodec$BlobInputStreamEnumeration.hasMoreElements(BlobToByteBufferCodec.java:92) ~[r2dbc-h2-0.8.4.RELEASE.jar:0.8.4.RELEASE]
		at java.base/java.io.SequenceInputStream.peekNextStream(SequenceInputStream.java:100) ~[na:na]
		at java.base/java.io.SequenceInputStream.<init>(SequenceInputStream.java:67) ~[na:na]
		at io.r2dbc.h2.codecs.BlobToByteBufferCodec.doEncode(BlobToByteBufferCodec.java:65) ~[r2dbc-h2-0.8.4.RELEASE.jar:0.8.4.RELEASE]