rabbitmq/amqp091-go

Use of buffered delivery channels when prefetch_count is not null

Ewemek opened this issue · 7 comments

Is your feature request related to a problem? Please describe.

With the current unbuffered channel and when using a defined prefetch_count, in some situations, the prefetch_count is not honored by a simple for/select/default on the delivery channel returned by the Consume() function. During my searches, I came to the conclusion that because this channel is no buffered, writes to this channel are blocking, so the messages coming from the RMQ server are not piled into the channel sufficiently fast, resulting in our consumer handling messages 1 by 1 instead of 1000 by 1000 (with prefetch_count set to 1000).

Describe the solution you'd like

Due to how the prefetch, I understand this is complicated to end up with an implementation that would not allow from misinterpretation. But here is what I did in a fork for our personal use:

$ git diff HEAD^1
diff --git a/channel.go b/channel.go
index ae6f2d1..33624f9 100644
--- a/channel.go
+++ b/channel.go
@@ -1090,7 +1090,7 @@ be dropped.
 When the consumer tag is cancelled, all inflight messages will be delivered until
 the returned chan is closed.
 */
-func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
+func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, prefetchCount int, args Table) (<-chan Delivery, error) {
        // When we return from ch.call, there may be a delivery already for the
        // consumer that hasn't been added to the consumer hash yet.  Because of
        // this, we never rely on the server picking a consumer tag for us.
@@ -1114,7 +1114,12 @@ func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal,
        }
        res := &basicConsumeOk{}
 
-       deliveries := make(chan Delivery)
+       var deliveries chan Delivery
+       if prefetchCount > 0 {
+               deliveries = make(chan Delivery, prefetchCount)
+       } else {
+               deliveries = make(chan Delivery)
+       }

        ch.consumers.add(consumer, deliveries)

Describe alternatives you've considered

In order to workaround this issue, we did implemented a soft timeout in our for/select loop: for each delivery we've read, we reset a timeout (around 20 to 100ms). This forced to receive the desired amount of deliveries. But at the cost of waiting, hence reducing the throughput of the app in terms of deliveries/sec.

Additional context

Go by example for buffered channels: https://go.dev/tour/concurrency/3

If you can prove with a Go Benchmark that this results in increased throughput, I'll be happy to accept a PR. I won't be able to work on this myself in the short term.

Instead of changing the signature of Consume and introducing a breaking change, it'd be better to introduce a new private field in Channel to record the last set QoS in that field, and then use it in Consume to determine whether deliveries should be buffered.

There's one caveat to consider: what should we do when QoS is set to global?

Great idea for how to implement this change. For global QoS (and other situations when we can't really decide), the actual implementation could be fine (no buffered-channel).

I'll try to make a benchmark, but the problem I faced was that deliveries were not batched correctly, for instance I expected to receive 1000 but I got 1 delivery at a time. And this was indexed on other parts of my code, which made this behaviour really difficult to point out.

main_test.txt
Here is my main_test.go to point out the behavioural difference whether using buffered channels or not. You'll have to override the amqp091 lib in order to test with buffered channels.
Note: I had to rename the file to .txt in order to attach it in GitHub.

I'd like to point out that batching deliveries is an important feature for us, as we aggregate some processing data over each batch, this helps us reducing the load on our database for instance.

Thank you for writing the benchmark. I've run the benchmark, without using buffered channels vs using buffered channels, and I can observe a significant difference in the batch sizes. The results are in this gist:

https://gist.github.com/Zerpet/d8ff70201033f60300109ca8a846eeb4

There's one failing test TestLeakClosedConsumersIssue264, regarding leaked messages, which seems legitimate. I'm looking into it.

I've been digging about this test failure and came to the conclussion that we can't implement buffered channels for deliveries, because that would break the semantics of Channel.Close() and Connection.Close(). Here streadway/amqp#264 (comment) the original author and maintainers of this library decided that Channel.Close() will immediately stop deliveries on the delivery channel. This was to resolve an issue related to a memory leak (see all the conversation in the issue for more details). By implementing buffered channels in delivery, we may hit a regression of this leak because delivery channel is returned to the caller of Consume. At that point, we lose the control over how many references are alive to this channel, and we can't drain it either.

Your best solution is to implement a batching of deliveries yourself, and dispatch the batch to a routine or worker process. In batching, you can stop the batching by number of messages or timeout of milliseconds. I adapted the code you shared and benchmarked both options, results are here:

https://gist.github.com/Zerpet/b894e894cd2b7c314c61f8ad13497ae6

It is evident that there's a performance loss by batching from the unbuffed channel with timeout, but the alternative is to implement a new function, similar to the one you proposed initially, and be very clear about its semantics. The application will have to handle errors from Ack() method as it may ack on a closed channel/connection.

Yep, it is a shame that go does not provide a way to check if a channel is closed. But I agree with michaelklishin when they say that deliveries will be lost in auto-ack mode (unbuffered channels could as well loose up to 1 delivery the same way), but if one wants safety, they should use manual-ack, this way unacked message will be eventually requeued. I think it's intrinsic to the initial design of AMQP anyway.

IMO the solution with timeout+max is not an option, because it induces slows the delivery rate very much when, in particular when the RMQ server is not on the same machine than the consumer. Plus it forces users of the lib to have a deep understanding of this side-effect.

May I add that it would make sense that people using a high prefetch count also have a high delivery rate, so maybe the loss that would be induced (in auto-ack mode) would not be such a bad thing after all.

Yep, it is a shame that go does not provide a way to check if a channel is closed

Indeed, that would make this proposal more viable.

IMO the solution with timeout+max is not an option, because it induces slows the delivery rate very much when, in particular when the RMQ server is not on the same machine than the consumer.

The other alternative, if you don't need exclusivity, is to add more consumers to the queue.

Given that a buffered channel is a potential for memory leak, and this change would break the semantics of Channel/Connection close, I believe we are moving towards the conclussion that this issue won't be implemented in the client.