reactor/reactor-rabbitmq

Sender does not close / manager resource management channel when declaring objects

a701440 opened this issue · 3 comments

Sender class uses separate resource management channel mono that supplies a channel for declaration operations.
However unlike the "send" operation declaration operations do not close/handle closing of the channel.

This promotes giving the same cached channel for all resource management operations.
However it seems like channels can not be used by multiple threads concurrently, so that does not work well.
See: rabbitmq/rabbitmq-java-client#1194

   return channelMono.map(channel -> {
            try {
                return channel.asyncCompletableRpc(declare);
            } catch (IOException e) {
                throw new RabbitFluxException("Error during RPC call", e);
            }
        }).flatMap(future -> Mono.fromCompletionStage(future))
            .flatMap(command -> Mono.just((AMQP.Queue.DeclareOk) command.getMethod()))
            .publishOn(resourceManagementScheduler);

This will request new channel from the channelMono and then not close it and also not give it to some close-handler.
In send operations we have:

.doFinally(st -> channelCloseHandler.accept(st, channel))

The channel to deal with resources is used during the life of the Sender instance. You can provide your own channel mono in the SenderOptions. You can also provide a dedicated instance with the ResourceManagementOptions parameter of each resource-related method.

If you hit concurrency issues with resource management, you can provide your own instance of scheduler with the SenderOptions#resourceManagementScheduler.

I understand that. It's just the patterns used with the primary channelMono and ResourceManagement channelMono are different in the Sender class. Primary channelMono supports ChannelPool and returns channels to the pool using the close handler of the pool. Resource management channel mono does not support that and it would be nice if this was consistent, especially given that channels are not thread-safe.

I actually ended-up creating a work-around hack to return the channels used in resource management Mono to the channel
pool using Reactor subscribe context and doFinally operator (getAutoReturnedChannelMono):

    public Mono<? extends Channel> getAutoReturnedChannelMono() {
        return getChannelMono().transformDeferredContextual(this::autoReturnChannel).contextWrite(this::createAutoReturnContext);
    }

    private Mono<? extends Channel> autoReturnChannel(Mono<? extends Channel> channelMono, ContextView context) {
        AtomicReference<Channel> ref = context.get(AtomicReference.class);
        return channelMono.doOnNext(ref::set).doFinally(signal -> getChannelCloseHandler().accept(signal, ref.get()));
    }

    private Context createAutoReturnContext(Context context) {
        return context.put(AtomicReference.class, new AtomicReference<Channel>());
    }
...

Related to this LazyChannelPool class could also use some improvement.
When dispensing channels it does not check that channel belongs to the same connection that is currently in use, may be should also check if the channel is still open. Also when dispensing channels it would be nice to have a "loop" and drain "bad" channels from the pool, so that they can be garbage collected.

Feel free to provide a PR with backward-compatible changes. There may be unfortunate choices in the current implementation but we have to stick with them until the next release that allows breaking changes (minor releases for Reactor projects IIRC).