reactor/reactor-rabbitmq

blocking call at Sender#send

62mkv opened this issue · 6 comments

62mkv commented

Motivation

When run with BlockHound, following exception is detected around the Sender#send call:

Caused by: reactor.blockhound.BlockingOperationError: Blocking call! java.lang.Object#wait
	at java.lang.Object.wait(Object.java)
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoMapFuseable] :
	reactor.core.publisher.Mono.map
	reactor.rabbitmq.Sender.getChannelMono(Sender.java:308)
Error has been observed at the following site(s):
	|_         Mono.map ⇢ at reactor.rabbitmq.Sender.getChannelMono(Sender.java:308)
	|_ Mono.flatMapMany ⇢ at reactor.rabbitmq.Sender.send(Sender.java:154)
	|_        Flux.then ⇢ at reactor.rabbitmq.Sender.send(Sender.java:170)
	|_                  ⇢ at reactor.rabbitmq.Sender.send(Sender.java:145)
Stack trace:
		at java.lang.Object.wait(Object.java)
		at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:73)
		at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
		at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
		at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:502)
		at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:293)
		at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:141)
		at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:133)
		at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:182)
		at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:555)
		at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.createChannel(AutorecoveringConnection.java:165)
		at reactor.rabbitmq.Sender$ChannelCreationFunction.apply(Sender.java:1078)
		at reactor.rabbitmq.Sender$ChannelCreationFunction.apply(Sender.java:1073)
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)

Desired solution

Considered alternatives

I had to add a BlockHound exception as in

builder.allowBlockingCallsInside("reactor.rabbitmq.Sender$ChannelCreationFunction", "apply");
62mkv commented

added reproduction case as per @bsideup comments: https://github.com/62mkv/spring-boot-reactive-rabbitmq

just run the SpringBootReactiveRabbitmqApplicationTests test suite and hopefully the issue will manifest itself

62mkv commented

also, there's a blocking call in this lambda in the Sender#send(...) method: (Sender.java:169)

                .doFinally(st -> channelCloseHandler.accept(st, channel))

this can be confirmed with the same repro project, just add this parameter into BlockHound.install(..) call:

builder -> {
            builder.allowBlockingCallsInside("reactor.rabbitmq.Sender$ChannelCreationFunction", "apply");
        }

as long as original issue has been defined for Sender#send in general, I decided to not create a separate issue thus far (what do you think, @bsideup ?)

62mkv commented

so, apparently both creating and closing a channel when doing Sender#send use blocking calls.

The channel creation in the AMQP Java client uses a blocking RPC utility. It may be possible to patch the Java client to create a channel without blocking, by using Channel#asyncCompletableRpc internally. Hopefully this would not require a breaking change.

The impact is likely to be minor in practice as channels are supposed to be long-lived objects.

62mkv commented

may I enquire if there had been any changes in the code? also, I am a little bit confused by the phrasing that "channels are supposed to be long-lived objects".

As per the docs, (https://projectreactor.io/docs/rabbitmq/milestone/reference/#_channel_pooling_in_code_sender_code),

By default, Sender#send* methods open a new Channel for every call.

also,

With longer sequence of messages (100 or more), channel pooling can perform worse than without pooling at all. According to the same micro-benchmarks, channel pooling does not make sending with publisher confirms perform better, it appears to perform even worse.

All this considered, I would read that as "channels are at least as much likely to be long-lived objects, as they are not". Am I reading it wrong?

No there has been no change with regards to this issue.

Channels are not supposed to be created for a one-time operation, hence they "are supposed to be long-lived objects". In the context of Reactor RabbitMQ, a channel would be created and used to send an infinite Flux.

Pooling has been introduced to deal with "short" Fluxes, where channel creation can impede performance (opening a channel can become a significant operation if you use it only to send a few messages). In the cases of the benchmarks, as soon as the Flux is long enough, the pooling housekeeping becomes more significant than just creating a new channel.

So, yes, channels are meant to be long-lived, until they are not for a given use case, but unfortunately there is no much we can do, because this is how AMQP 0-9-1 works (you need a not-so-cheap entity to send even one message), so we have to resort to tricks like pools, which honestly do the job.