spring-cloud/spring-cloud-stream

Feature Request: Add delivery attempt count to RabbitMQ DLQ messages

samragu opened this issue · 21 comments

RabbitMQ sets x-death message headers that had a count parameter incremented each time a message was delivered to DLQ. The messages from DLQ went back to normal queue after TTL expired and we relied on the count parameter to retry a message so many times. Starting with RabbitMQ 3.13, RabbitMQ has discontinued support for this feature and making all x- headers as server headers. See rabbitmq/rabbitmq-server#10709 for details. Can SCSt set a similar header and increment the value each time the message is delivered to DLQ? The exception stack trace is already being set on these messages which are very useful for troubleshooting.

We currently don’t set any such a header and fully rely on whatever broker gives us in that x-death header. So, why is the request, please?

As mentioned above, RabbitMQ 13.0 has stopped incrementing count parameter in x-death header. So there is no way for application to determine how many times a messages has landed in DLQ

OK. I see now. So, reopening respective Spring AMQP issue: spring-projects/spring-amqp#2688.

Not sure, though, what you mean about fixing Rabbit Binder, but probably that fails just to the docs as we discussing in that issue.
So, keeping this opened until we got a fix.
If you have any ideas what has to be done, feel free to come back to us in that Spring AMQP issue.
However the docs over here for Rabbit Binder can get the fix in parallel since it looks like they talk about some custom strategy based on x-death for now: https://github.com/search?q=repo%3Aspring-cloud%2Fspring-cloud-stream-binder-rabbit%20x-death&type=code

Thanks

@samragu ,

See the comment from RabbitMQ team member: spring-projects/spring-amqp#2688 (comment).

So, an x-death is still going to be there and its count will be incremented anyway, but apparently that is going to be done only on the server side when message is moved back from DLQ after expiration.

Either way something has to be fixed. What is good that it is only docs on Spring Cloud Stream side.

I wanted to talk about a couple of issues that popped up because of something I did.

I've been using this code within Consumer bean impl to make N attempts to handle Message<*>:

@Service
class EventHandleService {

    fun handleEvent(
        message: Message<out Any>,
        maxAttempts: Int,
        handler: () -> Unit
    ) {
        try {
            handler.invoke()
        } catch (e: Exception) {
            if (maxAttempts == 1 || getCurrentAttempt(message) > maxAttempts - 1) {
                // drops message out from the queue
                throw ImmediateAcknowledgeAmqpException("Failed after max attempts exceed")
            } else {
                // rethrowing exception must put message into DLQ
                throw e
            }
        }
    }

    private fun getCurrentAttempt(message: Message<out Any>): Long {
        val xDeath = message.headers["x-death"]
        val currentAttempt = if (xDeath == null) {
            0L
        } else {
            ((xDeath as ArrayList<*>)[0] as Map<*, *>)["count"] as Long
        }
        return currentAttempt + 1
    }
}

We're counting on something called the "x-death" header, as the docs say. We're expecting either Spring or Rabbit to bump up a counter whenever something goes wrong.

I've been testing this out with a real Rabbit setup using testcontainers, and everything was going smoothly until Rabbit got updated to version 3.13.x. Suddenly, if a message failed once, it got stuck in a never-ending loop. After digging around, I noticed that the counter in the "x-death" header was being updated randomly.

I'm not sure if I'm misinterpreting the documentation or if my implementation of the retry mechanism is flawed. Please let me know if the issue lies with my approach.

ansd commented

@dvovney can you please create a GitHub repo with a minimal reproducible example so that I can run your code with a single command on my local machine? In your GitHub repo's README, please also include the Java version + Erlang/OTP version you're using.
I would like to debug what you're saying and can't do so with the few lines of Java snippet you pasted here.

@dvovney ,

as you said yourself, RabbitMQ starting with version 3.13.x is not going to take into account the x-death sent from the client.
So, you are right: our Spring Cloud Stream docs is wrong recommending that header for application-broker retry pattern.
Therefore we have to fix the doc respectively to recommend some custom header which has to be incremented manually before publishing failed message back to the broker.
According to the x-death docs, that should work properly only in the mentioned cases for DLX: https://www.rabbitmq.com/docs/dlx.
Apparently it is not going to work anymore if w publish to DLX manually.
Therefore a new recommendation is to rely on a custom header.
And that's exactly what this issue is about: fix docs to provide a new recommendation instead of "not working any more" x-death.
See the mentioned RabbitMQ issue for possible solution on your side.

@ansd ,

correct me, please, if I have missed anything.

Thanks

@artembilan thank you for the feedback! looking forward for updated Spring Cloud Stream docs

Just to be clear, you are proposing to add a new header (for example, spring-retry-count) with delivery count of the message incremented each time the message is delivered to DLQ, correct?

Also, it would be nice if applications can intercept the message before Spring delivers the message to DLQ so that things like delivery count can be updated by application itself within the message as opposed to Spring updating a header

No, I'm not proposing to add such a header since it turned out there is no any DLX retry components in the framework.
We have the mentioned docs in Rabbit Binder talking about some custom strategy based on x-death and since that one is not going to work, we will change those docs to some custom header for such a manual interaction.
See more info here: https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_overview/putting-it-all-together.html.

Since you have raised this issue, it would be great to see the code how you deal with such retry over broker scenario.

Thanks

The code you referenced above is how many applications are handling retry and as you know that mechanism is broken since RabbitMQ 3.13. If you are going to remove that referenced code in documentation, the problem still remains.

Applications throw uncaught exception into Spring framework and the framework catches those exceptions, sets up x-exception-stacktrace message header and routes the message to DLQ. Applications have no way of setting up any additional headers or update the message itself in those scenarios. When such messages are routed to their normal queue from DLQ and redelivered, applications cannot determine how many times the messages have been retried, so they are stuck in an infinite loop

Yeah... Sorry, that was wrong sample code.
In my feeling this one supposed to work because we just do AmqpRejectAndDontRequeueException, this one leads to the channel.basicNack() and that must dump a message to the DLX on the server side with respective x-death and its count incremented.
In this everything is according to the RabbitMQ recommendations: we don't send a message with an x-death from the client.
If that does not work for your, then we need to consult with RabbitMQ.
And, please, share with us a simple project to reproduce and play with.

All I read about this x-death issue that we don't suppose to send it from the client. The part on the broker should remain the same. 🤷

@ansd ,
WDYT?

ansd commented

In this everything is according to the RabbitMQ recommendations: we don't send a message with an x-death from the client.

Exactly, that's why we can close this issue.

@samragu @dvovney as I wrote above: If you think, something doesn't work as expected, provide a GitHub repo with reproduction steps and open an issue in https://github.com/rabbitmq/rabbitmq-server/issues

@ansd ,

here is a sample to confirm that we have a problem with count in RabbitMQ 3.13: https://github.com/artembilan/sandbox/tree/master/spring-cloud-stream-rabbit-dlq.

When I change Docker image to 3.12 tag, it works well.

Feel free to reach me offline to discuss details.

Thanks

ansd commented

Thanks for the repro steps @artembilan .

Just for the non-Java people, this is how to run this code:
I started a local RabbitMQ broker to be able to insert debug statements into the RabbitMQ code

I added

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

to https://github.com/artembilan/sandbox/blob/master/spring-cloud-stream-rabbit-dlq/src/test/resources/application.properties
and commented out https://github.com/artembilan/sandbox/blob/1ff26fc4aeaaa10bdf7e16310443dfe262debcaf/spring-cloud-stream-rabbit-dlq/src/test/java/org/springframework/cloud/stream/rabbitdlq/SpringCloudStreamRabbitDlqApplicationTests.java#L40-L44

Thereafter, the test can be executed via gradle clean test.

I could repro that the test case succeeds in 3.12 and fails in 3.13.

After adding some debug statements into the RabbitMQ server code, I could see that the Java client indeed publishes messages with the x-header being set. The Java client does not reject the message.

In other words, the following statement is not correct:

In my feeling this one supposed to work because we just do AmqpRejectAndDontRequeueException, this one leads to the channel.basicNack() and that must dump a message to the DLX on the server side with respective x-death and its count incremented.
In this everything is according to the RabbitMQ recommendations: we don't send a message with an x-death from the client.

ansd commented

Topology in blue, message flow in red:

topology

and corresponding Wireshark capture:
wireshark

I see. Thanks.

So, I was wrong. That AmqpRejectAndDontRequeueException leads indeed to manual publishing from Rabbit Binder in the RabbitMessageChannelBinder.getErrorMessageHandler().
Couple time it is without x-death header, but then we really have it there.
My original assumption was about a default behavior of Spring AMQP message listener.
Apparently Spring Cloud Stream RabbitMQ Binder has this own custom logic to publish to DLX manually:

amqpMessage = {Message@11192} "(Body:'test data' MessageProperties [headers={x-exception-message=failed, x-first-death-exchange=DLX, x-original-routingKey=myDestination.consumerGroup, x-last-death-reason=expired, x-death=[{reason=expired, count=1, exchange=DLX, time=Wed May 08 12:13:08 EDT 2024, routing-keys=[myDestination.consumerGroup], queue=myDestination.consumerGroup.dlq}], x-first-death-reason=expired, x-first-death-queue=myDestination.consumerGroup.dlq, x-last-death-queue=myDestination.consumerGroup.dlq, x-original-exchange=, x-last-death-exchange=DLX, x-exception-stacktrace=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@65165241], failedMessage=GenericMessage [payload=byte[9], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, amqp_deliveryTag=2, x-last-death-reason=expired, amqp_consumerQueue=myDestination.consumerGroup, amqp_redelivered=false, amqp_re"
 messageProperties = {MessageProperties@11193} "MessageProperties [headers={x-exception-message=failed, x-first-death-exchange=DLX, x-original-routingKey=myDestination.consumerGroup, x-last-death-reason=expired, x-death=[{reason=expired, count=1, exchange=DLX, time=Wed May 08 12:13:08 EDT 2024, routing-keys=[myDestination.consumerGroup], queue=myDestination.consumerGroup.dlq}], x-first-death-reason=expired, x-first-death-queue=myDestination.consumerGroup.dlq, x-last-death-queue=myDestination.consumerGroup.dlq, x-original-exchange=, x-last-death-exchange=DLX, x-exception-stacktrace=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@65165241], failedMessage=GenericMessage [payload=byte[9], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, amqp_deliveryTag=2, x-last-death-reason=expired, amqp_consumerQueue=myDestination.consumerGroup, amqp_redelivered=false, amqp_receivedRoutingKey=m"
  headers = {HashMap@11201}  size = 11
   "x-exception-message" -> "failed"
   "x-first-death-exchange" -> "DLX"
   "x-original-routingKey" -> "myDestination.consumerGroup"
   "x-last-death-reason" -> "expired"
   "x-death" -> {ArrayList@11227}  size = 1
    key = "x-death"
    value = {ArrayList@11227}  size = 1
     0 = {HashMap@11239}  size = 6
      "reason" -> "expired"
      "count" -> {Long@11251} 1
      "exchange" -> "DLX"
      "time" -> {Date@11255} "Wed May 08 12:13:08 EDT 2024"
      "routing-keys" -> {ArrayList@11257}  size = 1
      "queue" -> "myDestination.consumerGroup.dlq"
   "x-first-death-reason" -> "expired"
   "x-first-death-queue" -> "myDestination.consumerGroup.dlq"
   "x-last-death-queue" -> "myDestination.consumerGroup.dlq"
   "x-original-exchange" -> ""
   "x-last-death-exchange" -> "DLX"
   "x-exception-stacktrace" -> "org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@65165241], failedMessage=GenericMessage [payload=byte[9], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, amqp_deliveryTag=2, x-last-death-reason=expired, amqp_consumerQueue=myDestination.consumerGroup, amqp_redelivered=false, amqp_receivedRoutingKey=myDestination.consumerGroup, amqp_contentEncoding=UTF-8, x-exception-message=failed, x-first-death-exchange=DLX, x-original-routingKey=myDestination.consumerGroup, x-death=[{reason=expired, count=1, exchange=DLX, time=Wed May 08 12:13:08 EDT 2024, routing-keys=[myDestination.consumerGroup], queue=myDestination.consumerGroup.dlq}], x-first-death-reason=expired, x-first-death-queue=myDestination.consumerGroup.dlq, x-last-death-queue=myDestination.consumerGroup.dlq, x-original-exchange=, id=f2b4a435-1583-669d-cf84-63adc8e1be16, x-last-dea"

So, that's the one which has to be fixed for the mention x-retry-count custom header.
I mean it would be great to have it without an x-, but change the rest related headers would be too much for the current point release.

Another thought: how about we manually increment that count property before publishing? At least for now to make as less breaking change as possible.
However we might need to determine the RabbitMQ version since it looks like before 3.13 it is still OK.

ansd commented

Another thought: how about we manually increment that count property before publishing?

I don't think that's possible since the count field is within the x-death header. The broker won't interpret the x-death header anymore and will replace x-death with its own value before delivering the message to the consumer.

So, a custom header retry-count sounds best, ideally without the x- prefix. If that's not possible, x-retry-count would do it as well for now.

ansd commented

... alternatively, the Java client could consume the message from the myDestination.consumerGroup queue and - instead of re-publishing the message to myDestination.consumerGroup.dlq - reject the message with requeue=false such that the message gets dead-lettered into myDestination.consumerGroup.dlq.
... the myDestination.consumerGroup is already configured to dead letter into myDestination.consumerGroup.dlq, but currently the code doesn't make use of it.

Thanks for confirmation, @ansd !

So, here is a workaround for the current Spring Cloud Stream version:

spring.cloud.stream.rabbit.bindings.input.consumer.republish-to-dlq=false

And it is going to work just exactly as you explained and as I expect with native Spring AMQP logic.
The only problem is with a custom DLX publisher and RabbitMQ 3.13 and higher.

We may fix this issue with respective docs IMPORTANT note and look into spring-retry-count header implementation in the mentioned Rabbit Binder logic for the next Spring Cloud Stream version.

Well, looking into this one more time and having the fact that current x-death solution works (per 3.13) for both broker and client DLX, it feels like we would have to rework the consumer side to re-map x-death to our new header.
And that might include all the x-death properties if we have respective counter part with those custom x- headers...