streadway/amqp

When I consume from a queue, the queue goes up

Closed this issue · 3 comments

I have tried a variety of settings. Seeing what would stick so some of this may be superfluous

	args := make(amqp.Table)
	args["x-message-ttl"] = int32(2000)
	args["x-max-length"] = int32(100)

	_, err = channelRabbitMQ.QueueDeclare(
		"udp", // queue name
		false, // durable
		false, // auto delete
		false, // exclusive
		true,  // no wait
		args,  // arguments
	)

This works. I load messages into a queue and it never gets too big.

However, the queue starts to grow rapidly once I start consuming and I can't for the life of me understand why. In advance, I don't want the queue message to ever be removed, I simply want it to be "consumed" but have it remain until it's expiry (I even have message expiry set).


	args := make(amqp.Table)
	args["x-message-ttl"] = int32(2000)
	args["x-max-length"] = int32(100)

	messages, err := channelRabbitMQ.Consume(
		"udp", // queue name
		"",    // consumer
		false, // auto-ack
		false, // exclusive
		false, // no local
		true,  // no wait
		args,  // arguments
	)
	if err != nil {
		log.Println(err)
	}

First, I cannot find the darn x arguments for rabbit consume. I don't even think these args here do anything.

Anywho, I've tried just about every iteration of these positional values and it either consumes the queue to 0 or to the contrary the queue will start building quickly above the set values.

Now if I nack it with a requeue it does work, but that jacks up cpu for rabbit and my golang binary

		for message := range messages {
			_, err := conn.Write(message.Body)
			message.Nack(false, true)
			if err != nil {
				fmt.Printf("Couldn't send response %v", err)
			}
		}

Now I understand reading the docs it talks about ttl and how if a message does not reach the head of the queue it can't expire and some other caveats, but I don't see how they apply. If without intervention, the queue is stable, then why would consuming messages cause it to rise?

The TLDR is could you give me pointers on how to transparently consume the queue in a way that its stable behavior remains as when it's not being consumed, alternatively what is the most effective way to be sending nacks?

Incase anyone is ever here with the same question, here is my possible unideal answer. This at least helped my cpu issues.

In my consume, I simply have a counter. After 400 consumes it will nack the 401st message with (I can't find the docs, I just saw it earlier so I remember what it is) true,true which is essentially saying nack all the messages before this one, and requeue. I still don't fully understand WHY this is happening, but I'm pretty sure it goes to re-queue and then respects the expiration and dev nulls them.

Overall, rabbit kinda sucks is how I feel and I'm trying kafka.

I'm leaving this here for other people who may come across this GH issue. This repository is not officially maintained by the RabbitMQ core team. Instead, use this library:

https://github.com/rabbitmq/amqp091-go

In addition, opening GH issues may not get the RabbitMQ team's attention. The mailing list may be a better place to ask:

https://groups.google.com/g/rabbitmq-users

Finally, to address the issue here:

In advance, I don't want the queue message to ever be removed, I simply want it to be "consumed" but have it remain until it's expiry

RabbitMQ streams would be the correct solution for this use-case - https://www.rabbitmq.com/streams.html

https://github.com/rabbitmq/rabbitmq-stream-go-client