Feature Request: Add delivery attempt count to RabbitMQ DLQ messages
samragu opened this issue · 21 comments
RabbitMQ sets x-death message headers that had a count parameter incremented each time a message was delivered to DLQ. The messages from DLQ went back to normal queue after TTL expired and we relied on the count parameter to retry a message so many times. Starting with RabbitMQ 3.13, RabbitMQ has discontinued support for this feature and making all x- headers as server headers. See rabbitmq/rabbitmq-server#10709 for details. Can SCSt set a similar header and increment the value each time the message is delivered to DLQ? The exception stack trace is already being set on these messages which are very useful for troubleshooting.
We currently don’t set any such a header and fully rely on whatever broker gives us in that x-death
header. So, why is the request, please?
As mentioned above, RabbitMQ 13.0 has stopped incrementing count parameter in x-death header. So there is no way for application to determine how many times a messages has landed in DLQ
OK. I see now. So, reopening respective Spring AMQP issue: spring-projects/spring-amqp#2688.
Not sure, though, what you mean about fixing Rabbit Binder, but probably that fails just to the docs as we discussing in that issue.
So, keeping this opened until we got a fix.
If you have any ideas what has to be done, feel free to come back to us in that Spring AMQP issue.
However the docs over here for Rabbit Binder can get the fix in parallel since it looks like they talk about some custom strategy based on x-death
for now: https://github.com/search?q=repo%3Aspring-cloud%2Fspring-cloud-stream-binder-rabbit%20x-death&type=code
Thanks
@samragu ,
See the comment from RabbitMQ team member: spring-projects/spring-amqp#2688 (comment).
So, an x-death
is still going to be there and its count
will be incremented anyway, but apparently that is going to be done only on the server side when message is moved back from DLQ after expiration.
Either way something has to be fixed. What is good that it is only docs on Spring Cloud Stream side.
I wanted to talk about a couple of issues that popped up because of something I did.
I've been using this code within Consumer bean impl to make N attempts to handle Message<*>:
@Service
class EventHandleService {
fun handleEvent(
message: Message<out Any>,
maxAttempts: Int,
handler: () -> Unit
) {
try {
handler.invoke()
} catch (e: Exception) {
if (maxAttempts == 1 || getCurrentAttempt(message) > maxAttempts - 1) {
// drops message out from the queue
throw ImmediateAcknowledgeAmqpException("Failed after max attempts exceed")
} else {
// rethrowing exception must put message into DLQ
throw e
}
}
}
private fun getCurrentAttempt(message: Message<out Any>): Long {
val xDeath = message.headers["x-death"]
val currentAttempt = if (xDeath == null) {
0L
} else {
((xDeath as ArrayList<*>)[0] as Map<*, *>)["count"] as Long
}
return currentAttempt + 1
}
}
We're counting on something called the "x-death" header, as the docs say. We're expecting either Spring or Rabbit to bump up a counter whenever something goes wrong.
I've been testing this out with a real Rabbit setup using testcontainers, and everything was going smoothly until Rabbit got updated to version 3.13.x. Suddenly, if a message failed once, it got stuck in a never-ending loop. After digging around, I noticed that the counter in the "x-death" header was being updated randomly.
I'm not sure if I'm misinterpreting the documentation or if my implementation of the retry mechanism is flawed. Please let me know if the issue lies with my approach.
@dvovney can you please create a GitHub repo with a minimal reproducible example so that I can run your code with a single command on my local machine? In your GitHub repo's README, please also include the Java version + Erlang/OTP version you're using.
I would like to debug what you're saying and can't do so with the few lines of Java snippet you pasted here.
@dvovney ,
as you said yourself, RabbitMQ starting with version 3.13.x
is not going to take into account the x-death
sent from the client.
So, you are right: our Spring Cloud Stream docs is wrong recommending that header for application-broker retry pattern.
Therefore we have to fix the doc respectively to recommend some custom header which has to be incremented manually before publishing failed message back to the broker.
According to the x-death
docs, that should work properly only in the mentioned cases for DLX: https://www.rabbitmq.com/docs/dlx.
Apparently it is not going to work anymore if w publish to DLX manually.
Therefore a new recommendation is to rely on a custom header.
And that's exactly what this issue is about: fix docs to provide a new recommendation instead of "not working any more" x-death
.
See the mentioned RabbitMQ issue for possible solution on your side.
@ansd ,
correct me, please, if I have missed anything.
Thanks
@artembilan thank you for the feedback! looking forward for updated Spring Cloud Stream docs
Just to be clear, you are proposing to add a new header (for example, spring-retry-count) with delivery count of the message incremented each time the message is delivered to DLQ, correct?
Also, it would be nice if applications can intercept the message before Spring delivers the message to DLQ so that things like delivery count can be updated by application itself within the message as opposed to Spring updating a header
No, I'm not proposing to add such a header since it turned out there is no any DLX retry components in the framework.
We have the mentioned docs in Rabbit Binder talking about some custom strategy based on x-death
and since that one is not going to work, we will change those docs to some custom header for such a manual interaction.
See more info here: https://docs.spring.io/spring-cloud-stream/reference/rabbit/rabbit_overview/putting-it-all-together.html.
Since you have raised this issue, it would be great to see the code how you deal with such retry over broker scenario.
Thanks
The code you referenced above is how many applications are handling retry and as you know that mechanism is broken since RabbitMQ 3.13. If you are going to remove that referenced code in documentation, the problem still remains.
Applications throw uncaught exception into Spring framework and the framework catches those exceptions, sets up x-exception-stacktrace message header and routes the message to DLQ. Applications have no way of setting up any additional headers or update the message itself in those scenarios. When such messages are routed to their normal queue from DLQ and redelivered, applications cannot determine how many times the messages have been retried, so they are stuck in an infinite loop
Yeah... Sorry, that was wrong sample code.
In my feeling this one supposed to work because we just do AmqpRejectAndDontRequeueException
, this one leads to the channel.basicNack()
and that must dump a message to the DLX on the server side with respective x-death
and its count
incremented.
In this everything is according to the RabbitMQ recommendations: we don't send a message with an x-death
from the client.
If that does not work for your, then we need to consult with RabbitMQ.
And, please, share with us a simple project to reproduce and play with.
All I read about this x-death
issue that we don't suppose to send it from the client. The part on the broker should remain the same. 🤷
@ansd ,
WDYT?
In this everything is according to the RabbitMQ recommendations: we don't send a message with an x-death from the client.
Exactly, that's why we can close this issue.
@samragu @dvovney as I wrote above: If you think, something doesn't work as expected, provide a GitHub repo with reproduction steps and open an issue in https://github.com/rabbitmq/rabbitmq-server/issues
@ansd ,
here is a sample to confirm that we have a problem with count
in RabbitMQ 3.13
: https://github.com/artembilan/sandbox/tree/master/spring-cloud-stream-rabbit-dlq.
When I change Docker image to 3.12
tag, it works well.
Feel free to reach me offline to discuss details.
Thanks
Thanks for the repro steps @artembilan .
Just for the non-Java people, this is how to run this code:
I started a local RabbitMQ broker to be able to insert debug statements into the RabbitMQ code
I added
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
to https://github.com/artembilan/sandbox/blob/master/spring-cloud-stream-rabbit-dlq/src/test/resources/application.properties
and commented out https://github.com/artembilan/sandbox/blob/1ff26fc4aeaaa10bdf7e16310443dfe262debcaf/spring-cloud-stream-rabbit-dlq/src/test/java/org/springframework/cloud/stream/rabbitdlq/SpringCloudStreamRabbitDlqApplicationTests.java#L40-L44
Thereafter, the test can be executed via gradle clean test
.
I could repro that the test case succeeds in 3.12 and fails in 3.13.
After adding some debug statements into the RabbitMQ server code, I could see that the Java client indeed publishes messages with the x-header
being set. The Java client does not reject the message.
In other words, the following statement is not correct:
In my feeling this one supposed to work because we just do AmqpRejectAndDontRequeueException, this one leads to the channel.basicNack() and that must dump a message to the DLX on the server side with respective x-death and its count incremented.
In this everything is according to the RabbitMQ recommendations: we don't send a message with an x-death from the client.
I see. Thanks.
So, I was wrong. That AmqpRejectAndDontRequeueException
leads indeed to manual publishing from Rabbit Binder in the RabbitMessageChannelBinder.getErrorMessageHandler()
.
Couple time it is without x-death
header, but then we really have it there.
My original assumption was about a default behavior of Spring AMQP message listener.
Apparently Spring Cloud Stream RabbitMQ Binder has this own custom logic to publish to DLX
manually:
amqpMessage = {Message@11192} "(Body:'test data' MessageProperties [headers={x-exception-message=failed, x-first-death-exchange=DLX, x-original-routingKey=myDestination.consumerGroup, x-last-death-reason=expired, x-death=[{reason=expired, count=1, exchange=DLX, time=Wed May 08 12:13:08 EDT 2024, routing-keys=[myDestination.consumerGroup], queue=myDestination.consumerGroup.dlq}], x-first-death-reason=expired, x-first-death-queue=myDestination.consumerGroup.dlq, x-last-death-queue=myDestination.consumerGroup.dlq, x-original-exchange=, x-last-death-exchange=DLX, x-exception-stacktrace=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@65165241], failedMessage=GenericMessage [payload=byte[9], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, amqp_deliveryTag=2, x-last-death-reason=expired, amqp_consumerQueue=myDestination.consumerGroup, amqp_redelivered=false, amqp_re"
messageProperties = {MessageProperties@11193} "MessageProperties [headers={x-exception-message=failed, x-first-death-exchange=DLX, x-original-routingKey=myDestination.consumerGroup, x-last-death-reason=expired, x-death=[{reason=expired, count=1, exchange=DLX, time=Wed May 08 12:13:08 EDT 2024, routing-keys=[myDestination.consumerGroup], queue=myDestination.consumerGroup.dlq}], x-first-death-reason=expired, x-first-death-queue=myDestination.consumerGroup.dlq, x-last-death-queue=myDestination.consumerGroup.dlq, x-original-exchange=, x-last-death-exchange=DLX, x-exception-stacktrace=org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@65165241], failedMessage=GenericMessage [payload=byte[9], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, amqp_deliveryTag=2, x-last-death-reason=expired, amqp_consumerQueue=myDestination.consumerGroup, amqp_redelivered=false, amqp_receivedRoutingKey=m"
headers = {HashMap@11201} size = 11
"x-exception-message" -> "failed"
"x-first-death-exchange" -> "DLX"
"x-original-routingKey" -> "myDestination.consumerGroup"
"x-last-death-reason" -> "expired"
"x-death" -> {ArrayList@11227} size = 1
key = "x-death"
value = {ArrayList@11227} size = 1
0 = {HashMap@11239} size = 6
"reason" -> "expired"
"count" -> {Long@11251} 1
"exchange" -> "DLX"
"time" -> {Date@11255} "Wed May 08 12:13:08 EDT 2024"
"routing-keys" -> {ArrayList@11257} size = 1
"queue" -> "myDestination.consumerGroup.dlq"
"x-first-death-reason" -> "expired"
"x-first-death-queue" -> "myDestination.consumerGroup.dlq"
"x-last-death-queue" -> "myDestination.consumerGroup.dlq"
"x-original-exchange" -> ""
"x-last-death-exchange" -> "DLX"
"x-exception-stacktrace" -> "org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1@65165241], failedMessage=GenericMessage [payload=byte[9], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=, amqp_deliveryTag=2, x-last-death-reason=expired, amqp_consumerQueue=myDestination.consumerGroup, amqp_redelivered=false, amqp_receivedRoutingKey=myDestination.consumerGroup, amqp_contentEncoding=UTF-8, x-exception-message=failed, x-first-death-exchange=DLX, x-original-routingKey=myDestination.consumerGroup, x-death=[{reason=expired, count=1, exchange=DLX, time=Wed May 08 12:13:08 EDT 2024, routing-keys=[myDestination.consumerGroup], queue=myDestination.consumerGroup.dlq}], x-first-death-reason=expired, x-first-death-queue=myDestination.consumerGroup.dlq, x-last-death-queue=myDestination.consumerGroup.dlq, x-original-exchange=, id=f2b4a435-1583-669d-cf84-63adc8e1be16, x-last-dea"
So, that's the one which has to be fixed for the mention x-retry-count
custom header.
I mean it would be great to have it without an x-
, but change the rest related headers would be too much for the current point release.
Another thought: how about we manually increment that count
property before publishing? At least for now to make as less breaking change as possible.
However we might need to determine the RabbitMQ version since it looks like before 3.13
it is still OK.
Another thought: how about we manually increment that count property before publishing?
I don't think that's possible since the count
field is within the x-death
header. The broker won't interpret the x-death
header anymore and will replace x-death
with its own value before delivering the message to the consumer.
So, a custom header retry-count
sounds best, ideally without the x-
prefix. If that's not possible, x-retry-count
would do it as well for now.
... alternatively, the Java client could consume the message from the myDestination.consumerGroup
queue and - instead of re-publishing the message to myDestination.consumerGroup.dlq
- reject the message with requeue=false
such that the message gets dead-lettered into myDestination.consumerGroup.dlq
.
... the myDestination.consumerGroup
is already configured to dead letter into myDestination.consumerGroup.dlq
, but currently the code doesn't make use of it.
Thanks for confirmation, @ansd !
So, here is a workaround for the current Spring Cloud Stream version:
spring.cloud.stream.rabbit.bindings.input.consumer.republish-to-dlq=false
And it is going to work just exactly as you explained and as I expect with native Spring AMQP logic.
The only problem is with a custom DLX
publisher and RabbitMQ 3.13
and higher.
We may fix this issue with respective docs IMPORTANT
note and look into spring-retry-count
header implementation in the mentioned Rabbit Binder logic for the next Spring Cloud Stream version.
Well, looking into this one more time and having the fact that current x-death
solution works (per 3.13
) for both broker and client DLX, it feels like we would have to rework the consumer side to re-map x-death
to our new header.
And that might include all the x-death
properties if we have respective counter part with those custom x-
headers...