Messages multiply in queues (part II)
nico3dfx opened this issue · 13 comments
Hi @cressie176,
after some time I have the same problem in production environment.
This is the actual configuration (extracted, I included only one queue)
{
"vhosts": {
"v0": {
"concurrency": 30,
"connection": {
"url": "amqp://username:password@rabbitmq.mydomain.com:5672/?heartbeat=30",
"socketOptions": {
"timeout": 10000
}
},
"exchanges": {
"amq.topic": {
"type": "topic",
"assert": false,
"check": true,
"options": {
"durable": true
}
}
},
"queues": {
"my_queue": {
"assert": true,
"check": true,
"options": {
"durable": false,
"deadLetterExchange": "dead_letters",
"deadLetterRoutingKey": "service.dead_letter"
}
}
},
"bindings": {
"my_queue": {
"source": "amq.topic",
"destination": "my_queue",
"destinationType": "queue",
"bindingKeys": [
"my_queue.my_software.created",
"my_queue.my_software.updated",
"my_queue.my_software.deleted"
]
}
},
"subscriptions": {
"my_queue": {
"queue": "my_queue"
}
}
},
"dlq": {
"connection": {
"url": "amqp://username:password@rabbitmq.mydomain.com:5672/?heartbeat=30",
"socketOptions": {
"timeout": 10000
}
},
"exchanges": {
"dead_letters": {
"assert": true
}
},
"queues": {
"dead_letters_service_queue": {
"assert": true
}
},
"bindings": {
"dead_letters_service": {
"source": "dead_letters",
"destination": "dead_letters_service_queue",
"destinationType": "queue",
"bindingKeys": [
"service.dead_letter"
]
}
},
"subscriptions": {
"dead_letters_service_queue": {
"queue": "dead_letters_service_queue"
}
}
}
}
}
And this is the application
Rascal.Broker.create(Rascal.withDefaultConfig(this.config), function (err, broker) {
if (err) {
logger.error(`[BC] Rascal.Broker.create error: ${err}`)
process.exit(1);
}
broker
.on('error', (error, {vhost, connectionUrl}) => {
logger.error(`[BC] Broker error!!! Vhost: ${vhost} - ConnectionUrl: ${connectionUrl} - Message: ${error.message}`)
})
.on('vhost_initialised', ({vhost, connectionUrl}) => {
logger.debug(`[BC] Vhost: ${vhost} was initialised using connection: ${connectionUrl}`)
})
.on('blocked', ({vhost, connectionUrl}) => {
logger.debug(`[BC] Vhost: ${vhost} was blocked using connection: ${connectionUrl}`)
})
.on('unblocked', ({vhost, connectionUrl}) => {
logger.debug(`[BC] Vhost: ${vhost} was unblocked using connection: ${connectionUrl}`)
})
logger.info('Broker Initialized. Now initializing Subscriptions...')
_.each(self.subscriptionNames, function (subscriptionName) {
logger.debug(`Initializing Subscription "${subscriptionName}"`)
let handler = require('./lib/handlers/_handler')(broker)
broker.subscribe(subscriptionName, {prefetch: 10}, function (err, subscription) {
if (err) {
logger.error(`[BS] Rascal.Broker.subscribe error: ${err}`)
process.exit(1);
}
logger.debug(`Subscription "${subscriptionName}" initialized.`)
subscription
.on('message', function (message, content, ackOrNack) {
handler(message, content, subscriptionName, self.retryConfig.retry, function (err) {
if (!err) return ackOrNack()
logger.debug(`End of retries for message "${message.properties.messageId}". Now republish or DLQ.`)
ackOrNack(err, self.retryConfig.rascal.default)
})
})
.on('invalid_content', function (err, message, ackOrNack) {
logger.error('Invalid Content - Message: ' + err.message)
ackOrNack(err, broker.config.rascal.dead_letter)
})
.on('redeliveries_exceeded', function (err, message, ackOrNack) {
logger.error('Redeliveries Exceeded - Message: ' + err.message)
ackOrNack(err, broker.config.rascal.dead_letter)
})
.on('cancel', function (err) {
logger.error(`[SC] Subscription cancel!!! Subscription: ${subscriptionName} - Message: ${err.message}`)
})
.on('error', function (err) {
logger.error(`[SE] Subscription error!!! Subscription: ${subscriptionName} - Message: ${err.message}`)
})
})
})
})
Inside RabbitMQ I have many queues, each queue has N consumer.
For the queue with the problem I had only 1 consumer with constant increase of messages inside the queue.
Some logs
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: Message:
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: {"fields":{"consumerTag":"PROD.52073b331311af77169d5febe0c252a2","deliveryTag":19161,"redelivered":false,"exchange":"amq.topic","routingKey":"my_queue.my_software.created"},"properties":{"contentType":"application/json","headers":{"dpa":{"errors":21,"runnerUrl":"https://mysoftware.mydomain.com/ws/v1/message/handle","lastErrorMessage":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"},"rascal":{"originalQueue":"dead_letters_service_queue","originalVhost":"dlq","redeliveries":0,"recovery":{"my_queue":{"republished":21,"immediateNack":true}},"originalExchange":"amq.topic","originalRoutingKey":"my_queue.my_software.created","error":{"message":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"},"restoreRoutingHeaders":true},"x-death":[{"count":1,"reason":"rejected","queue":"my_queue","time":{"!":"timestamp","value":1693811615},"exchange":"","routing-keys":["my_queue"]}],"x-first-death-exchange":"","x-first-death-queue":"my_queue","x-first-death-reason":"rejected"},"deliveryMode":2,"messageId":"5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b"},"content":{"type":"Buffer","data":[123,34,114,111,117,125]}}
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: Content:
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: {"routingKey":"my_queue.my_software.created","type":"event","module":"module://my_software","event":"my_queue.my_software.created","data":{"model":"my_model"}}
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] MESSAGE LOST!!!
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] Content: {"routingKey":"my_queue.my_software.created","type":"event","module":"module://my_software","event":"my_queue.my_software.created","data":{"model":"my_model"}}
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] Headers: {"dpa":{"errors":21,"runnerUrl":"https://mysoftware.mydomain.com/ws/v1/message/handle","lastErrorMessage":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"},"rascal":{"originalQueue":"dead_letters_service_queue","originalVhost":"dlq","redeliveries":0,"recovery":{"my_queue":{"republished":21,"immediateNack":true}},"originalExchange":"amq.topic","originalRoutingKey":"my_queue.my_software.created","error":{"message":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"},"restoreRoutingHeaders":true},"x-death":[{"count":1,"reason":"rejected","queue":"my_queue","time":{"!":"timestamp","value":1693811615},"exchange":"","routing-keys":["my_queue"]}],"x-first-death-exchange":"","x-first-death-queue":"my_queue","x-first-death-reason":"rejected"}
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] Errors #21 on "https://mysoftware.mydomain.com/ws/v1/message/handle"
2023-09-04T07:13:35.219Z [debug] [PID: 3921]: [5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b] Last Error Message: getaddrinfo ENOTFOUND mysoftware.mydomain.com
Finally, this is the handler function
function (broker) {
return async function (message, content, subscriptionName, retryConfig, cb) {
let operation = retry.operation(retryConfig)
operation.attempt(async function (currentAttemp) {
try {
await runner.post('url', content)
return cb()
} catch (e) {
if (operation.retry(e)) {
logger.debug(`Now retry. Attemp #${currentAttemp}/${retryConfig.retries} for message "${message.properties.messageId}"`)
return
}
message.properties.headers.dpa = message.properties.headers.dpa || {}
message.properties.headers.dpa.errors = message.properties.headers.dpa.errors || 0
message.properties.headers.dpa.errors++
message.properties.headers.dpa.runnerUrl = 'url'
message.properties.headers.dpa.lastErrorMessage = e.message
cb(e)
}
})
}
}
Any ideas?
Thanks.
Originally posted by @nico3dfx in #216 (comment)
Sorry @nico3dfx, I didn't spot your comment back from the 4th Sept. I'll take a look as soon as I can
After a quick glance there are somethings I would like to understand...
- The handler takes the broker as an argument but doesn't use it (probably not related, but I wanted to check the example code is representative)
- Are you filtering out the default subscriptions prior to the _.each loop. One is created automatically per queue? (I know we discussed this before, but worth double checking)
It appears to use a third party lib to do retries, but because the function body is in a try / catch block, this would only ever work if the body of the catch block, or the code followingcb(err)
threw an error
Ignore me! I had missed theoperation.retry(e)
line- What is your rascal recovery configuration?
- Why aren't we seeing logs like
Now retry. Attemp #1/20 for message "5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b"
- Where is the "MESSAGE LOST" coming from?
- What version of retry are you using?
- What is your retry configuration?
- Can you try to reproduce the error by calling
runner.post
with a non-existent domain after enabling the rascal debug (DEBUG=rascal:*
) and share the sanitised logs please?
Another potential issue is that you are mixing try / catch and calllbacks as follows
try {
// ...
return cb()
} catch (e) {
// If code within the success callback throws you will end up here
cb(e)
}
{
"dpa":{
"errors":21,
"runnerUrl":"https://mysoftware.mydomain.com/ws/v1/message/handle",
"lastErrorMessage":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"
},
"rascal":{
"originalQueue":"dead_letters_service_queue",
"originalVhost":"dlq",
"redeliveries":0,
"recovery":{
"my_queue":{
"republished":21,
"immediateNack":true
}
},
"originalExchange":"amq.topic",
"originalRoutingKey":"my_queue.my_software.created",
"error":{
"message":"getaddrinfo ENOTFOUND mysoftware.mydomain.com"
},
"restoreRoutingHeaders":true
},
"x-death":[
{
"count":1,
"reason":"rejected",
"queue":"my_queue",
"time":{
"!":"timestamp",
"value":1693811615
},
"exchange":"",
"routing-keys":[
"my_queue"
]
}
],
"x-first-death-exchange":"",
"x-first-death-queue":"my_queue",
"x-first-death-reason":"rejected"
}
Interesting that the dpa errors and the rascal republishes are the same (21). This suggests the same message has been around the loop 21 times by this point. Rascal's originalQueue header is dead_letters_service_queue
meaning it has been consumed from this dead letter queue by the application, failed, and then republished. Are you sure this queue doesn't accidentally have a consumer, potentially setup by a default subscription?
I'm less familiar with shovels, but is it possible you previously setup a shovel to move messages from the DLQ to the work queue and left it running?
If dead_letters_service_queue
has consumers you may want to swap your _.each
loop for broker.subscribeAll explicitly excluding any auto created subscriptions
try {
const subscriptions = await broker.subscribeAll(s => !s.autoCreated) => {
subscriptions.forEach(subscription => {
subscription.on('message', (message, content, ackOrNack) => {
// Do stuff with message
}).on('error', (err) => {
console.error('Subscriber error', err)
}) // and so on
});
} catch(err) {
// One or more subscriptions didn't exist
}
Looking at your subscription event handlers, I notice you are handling the redeliveries_exceeded
event. Out of curiosity, what counter implementation are you using? If you have the opportunity to use quorum queues (available from RabbitMQ 3.8.0 onwards) this would be preferable
After a quick glance there are somethings I would like to understand...
- The handler takes the broker as an argument but doesn't use it (probably not related, but I wanted to check the example code is representative)
I confirm, the broker is never used and can be removed.
- Are you filtering out the default subscriptions prior to the _.each loop. One is created automatically per queue? (I know we discussed this before, but worth double checking)
No filter is applied prior the _.each loop. Inside self.subscriptionNames
there are all the subscriptions inside the config file (my_queue in the example)
It appears to use a third party lib to do retries, but because the function body is in a try / catch block, this would only ever work if the body of the catch block, or the code followingcb(err)
threw an error
Ignore me! I had missed theoperation.retry(e)
line
Ok
- What is your rascal recovery configuration?
Here:
{
default: [
{
strategy: 'republish', defer: 60000, attempts: 20, xDeathFix: true
},
{
strategy: 'republish', immediateNack: true,
}
],
dead_letter: [
{
strategy: 'republish',
immediateNack: true,
}
]
}
- Why aren't we seeing logs like
Now retry. Attemp #1/20 for message "5e226d4d-fff6-4916-a0a5-4bed9e9a3d4b"
I removed it from the log but I have
- Where is the "MESSAGE LOST" coming from?
From the DLQ handler. When a message is lost after the recovery strategy, it will be worked by a dlq handler that consume the message sending it to a log application with a http request
- What version of retry are you using?
Latest version: 0.13.1
- What is your retry configuration?
Here:
{
retries: 3,
factor: 1,
minTimeout: 5000,
maxTimeout: 10000,
randomize: true,
}
- Can you try to reproduce the error by calling
runner.post
with a non-existent domain after enabling the rascal debug (DEBUG=rascal:*
) and share the sanitised logs please?
Interesting that the dpa errors and the rascal republishes are the same (21). This suggests the same message has been around the loop 21 times by this point. Rascal's originalQueue header is
dead_letters_service_queue
meaning it has been consumed from this dead letter queue by the application, failed, and then republished. Are you sure this queue doesn't accidentally have a consumer, potentially setup by a default subscription?
The queue has 1 consumer. In the _.each loop self.subscriptionNames
contains dead_letters_service_queue
I'm less familiar with shovels, but is it possible you previously setup a shovel to move messages from the DLQ to the work queue and left it running?
No
The queue has 1 consumer. In the _.each loop self.subscriptionNames contains dead_letters_service_queue
Do you have a special handler for the dead_letters_service_queue or is it using the same handler as the service queue?
- How do you select the correct handler for the queue?
- Can you share the dead letter handler code please?
The queue has 1 consumer. In the _.each loop self.subscriptionNames contains dead_letters_service_queue
Do you have a special handler for the dead_letters_service_queue or is it using the same handler as the service queue?
- How do you select the correct handler for the queue?
I have a config that get the handler from it.
- Can you share the dead letter handler code please?
Here:
const logger = require('../logger')
const runner = require('../runner')
const _ = require("lodash")
module.exports = function (broker, handlerOptions) {
return async function (message, content, subscriptionName, retryConfig, cb) {
logger.debug('Message:')
logger.debug(JSON.stringify(message))
logger.debug('Content:')
logger.debug(JSON.stringify(content))
message.properties.headers.dpa = message.properties.headers.dpa || {}
logger.debug(`[${message.properties.messageId}] MESSAGE LOST!!!`)
logger.debug(`[${message.properties.messageId}] Content: ${JSON.stringify(content)}`)
logger.debug(`[${message.properties.messageId}] Headers: ${JSON.stringify(message.properties.headers)}`)
logger.debug(`[${message.properties.messageId}] Errors #${message.properties.headers.dpa.errors} on "${message.properties.headers.dpa.runnerUrl}"`)
logger.debug(`[${message.properties.messageId}] Last Error Message: ${message.properties.headers.dpa.lastErrorMessage}`)
let body = {
dpa: {
errors: message.properties.headers.dpa.errors,
runnerUrl: message.properties.headers.dpa.runnerUrl,
lastErrorMessage: message.properties.headers.dpa.lastErrorMessage,
},
content: content
}
try {
await runner.post(process.env.DLQ_RUNNER_URL, body)
} catch (e) {
logger.error(`DLQ POST Error on "${process.env.DLQ_RUNNER_URL}"`)
}
return cb()
}
}
Thanks for all the responses. I have something running locally now, and it's working as you would expect...
- Receive message on service queue
- Simulate persistent DNS failure / retry 20 times
- Republish message to service queue if <= 20 attempts after a short delay (goto 1)
- Republish message to DLQ if > 20 attempts
- Receive message from DLQ
- Log dead lettered message
I assume it is the service_queue where you are seeing the duplicate messages. This is the code which attempts to recover errors by republishing the message to their original queue. As you can see, the publish code defers to amqplib's publish method, and nacks the original on error, or acks it on success. If the subscriber channel closed, or your application crashed between the publish and the ack/nack the original, it would result in a duplicate message, but I assume you would have seen this. If the session._ack
or session._nack
functions yielded an error*, it would have been eventually emitted as an error, and again you would have noticed.
There are two cases I can see where session._ack
/ session._nack
might fail and without yielding an error, potentially causing a duplicate, however I would expect you to have noticed both...
-
The amqp protocol does not reply to ack or nacks and consequently the amqplib methods for these functions do not accept callbacks or yield a promise. They are essentially "fire and forget". If the command never reaches the broker, the message will not be removed from the queue, and amqplib/rascal will be none the wiser. This would result in a prefetch allocation being used up until the delivery acknowledgement timeout was exceeded. This defaults to 30 minutes, so it would take a very long time to build up that amount of duplicates unless modified.
-
If the subscriber channel was closed. In nack's case the message would be rolled back, creating a duplicate and the nack error is swallowed by rascal (I will think of how to improve this), however the original error would still have been reported. In acks case the ack error should have been reported. Furthermore, the subscriber would have been cancelled by the closed channel, and had to be reestablished. Again something I would have expected you to notice and unlikely to happen repeatedly
Hi @nico3dfx,
I've gone through the rascal code again. This is the most relevant snippet...
const nackMessage = (err) => {
session._nack(message, (_nackErr) => {
// nackError just means the channel was already closed meaning the original message would have been rolled back
once(err);
});
};
if (err) return nackMessage(err);
if (!publisherChannel) return nackMessage(new Error('Unable to handle subscriber error by republishing. The VHost is shutting down'));
publisherChannel.on('error', (err) => {
nackMessage(err);
});
publisherChannel.on('return', () => {
nackMessage(new Error(format('Message: %s was republished to queue: %s, but was returned', message.properties.messageId, originalQueue)));
});
publisherChannel.publish(undefined, originalQueue, message.content, publishOptions, (err) => {
if (err) return nackMessage(err); // Channel will already be closed, reclosing will trigger an error
publisherChannel.close();
debug('Message: %s was republished to queue: %s %d times', message.properties.messageId, originalQueue, republished + 1);
session._ack(message, (err) => {
once(err, true);
});
});
The calls which interact with RabbitMQ are where it
- acknowledges a message (session._ack)
- rejects a message (session._nack)
- publishes a message (publisherChannel.publish)
The code never specifies a requeue, so rejecting will move a message to the DLQ. You said the duplicates appear on the work queue, so we can ignore calls to session._nack
This leaves the publish and the acknowledgement. If the cloned message was republished but the original message not acknowledged, the original would stay in an unacknowledged state, using up a prefetch slot. Depending on your RabbitMQ configuration it can eventually time out and be rolled back to the work queue creating a duplicate. However, I don't see anything which could go wrong between the publish and acknowledgement without an error being reported and/or the original message rejected. The failure scenarios between the publish and the acknowledgement are
- publish callback yields an error - original is rejected
- publisher channel emits an error on close - highly unlikely since we just used the channel, but the original would be rejected anyway
- republished message is unroutable - highly unlikely since it is being republished directly to the queue we just read it from, but the original would be rejected anyway
- any other publisher channel error - the original is rejected
- a subscriber channel error preventing the acknowledgement - this would cause a duplicate, and Rascal would auto-resubscribe, but still report the error. I expect you to have noticed ~1M errors.
Another possibility is that the application crashes (possibly due to a poison message) at just the point between the publish and acknowledgement. The application would need to be configured to auto-restart, and I expect you to have noticed ~1M restarts.
The last possibility I considered is if the above republish code was called repeatedly. Even if this were possible (rascal reports repeat calls to ackOrNack) the original message would have been acknowledged or rejected repeatedly, resulting in a PRECONDITION_FAILED - unknown delivery tag
error. Again, I expect you to have noticed ~1M of these.
I've found no indication that this is a bug with Rascal, and it's even less likely to be a bug with lodash.once (which Rascal uses to prevent multiple error / close events causing repeated code invocations) or async.eachSeries (which Rascal uses to loop through the recovery strategies).
Have you found anything on your side?
Hi @cressie176 ,
unfortunately I don't find the reason to reproduce the error.
I'm working to split the configuration for each customer and to an alert application that check and notify if more than N messages inside the queues.
Hi @nico3dfx,
I've released a patch version of Rascal (17.0.1) with reworked code in the republish and forward recovery strategies. It removes a couple of extremely rare corner cases which could cause an individual duplicate message (e.g. getting multiple error events while republishing the original message), but nothing which would have caused what you are seeing.
OK to close this issue again? I still can't see anything in Rascal which would cause multiple duplicates to be created and have been unable to reproduce.