When I consume from a queue, the queue goes up
Closed this issue · 3 comments
I have tried a variety of settings. Seeing what would stick so some of this may be superfluous
args := make(amqp.Table)
args["x-message-ttl"] = int32(2000)
args["x-max-length"] = int32(100)
_, err = channelRabbitMQ.QueueDeclare(
"udp", // queue name
false, // durable
false, // auto delete
false, // exclusive
true, // no wait
args, // arguments
)
This works. I load messages into a queue and it never gets too big.
However, the queue starts to grow rapidly once I start consuming and I can't for the life of me understand why. In advance, I don't want the queue message to ever be removed, I simply want it to be "consumed" but have it remain until it's expiry (I even have message expiry set).
args := make(amqp.Table)
args["x-message-ttl"] = int32(2000)
args["x-max-length"] = int32(100)
messages, err := channelRabbitMQ.Consume(
"udp", // queue name
"", // consumer
false, // auto-ack
false, // exclusive
false, // no local
true, // no wait
args, // arguments
)
if err != nil {
log.Println(err)
}
First, I cannot find the darn x arguments for rabbit consume. I don't even think these args here do anything.
Anywho, I've tried just about every iteration of these positional values and it either consumes the queue to 0 or to the contrary the queue will start building quickly above the set values.
Now if I nack it with a requeue it does work, but that jacks up cpu for rabbit and my golang binary
for message := range messages {
_, err := conn.Write(message.Body)
message.Nack(false, true)
if err != nil {
fmt.Printf("Couldn't send response %v", err)
}
}
Now I understand reading the docs it talks about ttl and how if a message does not reach the head of the queue it can't expire and some other caveats, but I don't see how they apply. If without intervention, the queue is stable, then why would consuming messages cause it to rise?
The TLDR is could you give me pointers on how to transparently consume the queue in a way that its stable behavior remains as when it's not being consumed, alternatively what is the most effective way to be sending nacks?
Incase anyone is ever here with the same question, here is my possible unideal answer. This at least helped my cpu issues.
In my consume, I simply have a counter. After 400 consumes it will nack the 401st message with (I can't find the docs, I just saw it earlier so I remember what it is) true,true which is essentially saying nack all the messages before this one, and requeue. I still don't fully understand WHY this is happening, but I'm pretty sure it goes to re-queue and then respects the expiration and dev nulls them.
Overall, rabbit kinda sucks is how I feel and I'm trying kafka.
I'm leaving this here for other people who may come across this GH issue. This repository is not officially maintained by the RabbitMQ core team. Instead, use this library:
https://github.com/rabbitmq/amqp091-go
In addition, opening GH issues may not get the RabbitMQ team's attention. The mailing list may be a better place to ask:
https://groups.google.com/g/rabbitmq-users
Finally, to address the issue here:
In advance, I don't want the queue message to ever be removed, I simply want it to be "consumed" but have it remain until it's expiry
RabbitMQ streams would be the correct solution for this use-case - https://www.rabbitmq.com/streams.html