blocking call at Sender#send
62mkv opened this issue · 6 comments
Motivation
When run with BlockHound, following exception is detected around the Sender#send
call:
Caused by: reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Object#wait
at java.lang.Object.wait(Object.java)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoMapFuseable] :
reactor.core.publisher.Mono.map
reactor.rabbitmq.Sender.getChannelMono(Sender.java:308)
Error has been observed at the following site(s):
|_ Mono.map ⇢ at reactor.rabbitmq.Sender.getChannelMono(Sender.java:308)
|_ Mono.flatMapMany ⇢ at reactor.rabbitmq.Sender.send(Sender.java:154)
|_ Flux.then ⇢ at reactor.rabbitmq.Sender.send(Sender.java:170)
|_ ⇢ at reactor.rabbitmq.Sender.send(Sender.java:145)
Stack trace:
at java.lang.Object.wait(Object.java)
at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:73)
at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:133)
at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:182)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:555)
at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.createChannel(AutorecoveringConnection.java:165)
at reactor.rabbitmq.Sender$ChannelCreationFunction.apply(Sender.java:1078)
at reactor.rabbitmq.Sender$ChannelCreationFunction.apply(Sender.java:1073)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
Desired solution
Considered alternatives
I had to add a BlockHound exception as in
builder.allowBlockingCallsInside("reactor.rabbitmq.Sender$ChannelCreationFunction", "apply");
added reproduction case as per @bsideup comments: https://github.com/62mkv/spring-boot-reactive-rabbitmq
just run the SpringBootReactiveRabbitmqApplicationTests
test suite and hopefully the issue will manifest itself
also, there's a blocking call in this lambda in the Sender#send(...)
method: (Sender.java:169)
.doFinally(st -> channelCloseHandler.accept(st, channel))
this can be confirmed with the same repro project, just add this parameter into BlockHound.install(..)
call:
builder -> {
builder.allowBlockingCallsInside("reactor.rabbitmq.Sender$ChannelCreationFunction", "apply");
}
as long as original issue has been defined for Sender#send
in general, I decided to not create a separate issue thus far (what do you think, @bsideup ?)
so, apparently both creating and closing a channel when doing Sender#send
use blocking calls.
The channel creation in the AMQP Java client uses a blocking RPC utility. It may be possible to patch the Java client to create a channel without blocking, by using Channel#asyncCompletableRpc
internally. Hopefully this would not require a breaking change.
The impact is likely to be minor in practice as channels are supposed to be long-lived objects.
may I enquire if there had been any changes in the code? also, I am a little bit confused by the phrasing that "channels are supposed to be long-lived objects".
As per the docs, (https://projectreactor.io/docs/rabbitmq/milestone/reference/#_channel_pooling_in_code_sender_code),
By default, Sender#send* methods open a new Channel for every call.
also,
With longer sequence of messages (100 or more), channel pooling can perform worse than without pooling at all. According to the same micro-benchmarks, channel pooling does not make sending with publisher confirms perform better, it appears to perform even worse.
All this considered, I would read that as "channels are at least as much likely to be long-lived objects, as they are not". Am I reading it wrong?
No there has been no change with regards to this issue.
Channels are not supposed to be created for a one-time operation, hence they "are supposed to be long-lived objects". In the context of Reactor RabbitMQ, a channel would be created and used to send an infinite Flux
.
Pooling has been introduced to deal with "short" Flux
es, where channel creation can impede performance (opening a channel can become a significant operation if you use it only to send a few messages). In the cases of the benchmarks, as soon as the Flux
is long enough, the pooling housekeeping becomes more significant than just creating a new channel.
So, yes, channels are meant to be long-lived, until they are not for a given use case, but unfortunately there is no much we can do, because this is how AMQP 0-9-1 works (you need a not-so-cheap entity to send even one message), so we have to resort to tricks like pools, which honestly do the job.