need help on Error: ack called with no unacknowledged messages
Closed this issue · 2 comments
Currently, I am using the push/worker pattern to do message queueing and processing. I want remove message only if a worker successfully processed it.
Here's part of my code:
class JobQueue
constructor: ()->
@context = rabbit.createContext('amqp://localhost')
@context.on 'error', (err)->console.error err
queueJob: (provider, job, callback)->
return callback Error("Provider not specified") unless provider? and provider isnt ''
job.title = getJobTitle job.type, provider, job.data
push = @context.socket 'PUSH', {persistent: true }
push.connect provider, ->
push.write JSON.stringify(job), 'utf8'
onProcess: (provider, processFn)->
worker = @context.socket 'WORKER', { persistent: true}
worker.connect provider
worker.on 'data', (job)->
processFn JSON.parse(job) , (err)->
return console.log err if err
worker.ack()
this code, there's no problem of queueing messages, but the worker will throw a error message about ack called with no unacknowledged messages.
Could the callback you provide to processFn possibly be called more than once for a particular message? Otherwise, looking at that code, I can't see how ack could be called more times than there are messages.
By the way, since #ack
acknowledges the least recent message, if processFn calls its callback asynchronously, you could ack the wrong messages. It might be better to do it this way (sorry, JS syntax):
// ...
worker.connect(provider);
function doJob() {
var job = worker.read();
if (!job) worker.once('readable', doJob);
else {
processFn(JSON.parse(job), function(err) {
if (err) return console.log(err);
worker.ack();
doJob();
});
}
}
In other words, read the next message after processing the last message. (See #79 about acknowledging messages out of order)