squaremo/rabbit.js

need help on Error: ack called with no unacknowledged messages

Closed this issue · 2 comments

Currently, I am using the push/worker pattern to do message queueing and processing. I want remove message only if a worker successfully processed it.

Here's part of my code:

class JobQueue

  constructor: ()->
    @context = rabbit.createContext('amqp://localhost')
    @context.on 'error', (err)->console.error err

  queueJob: (provider, job, callback)->
    return callback Error("Provider not specified") unless provider? and provider isnt ''
    job.title = getJobTitle job.type, provider, job.data
    push    = @context.socket 'PUSH', {persistent: true }
    push.connect provider, ->
      push.write JSON.stringify(job), 'utf8'

  onProcess: (provider, processFn)->
    worker  = @context.socket 'WORKER', { persistent: true}
    worker.connect provider
    worker.on 'data', (job)->
      processFn JSON.parse(job) , (err)->
        return console.log err if err
        worker.ack()

this code, there's no problem of queueing messages, but the worker will throw a error message about ack called with no unacknowledged messages.

Could the callback you provide to processFn possibly be called more than once for a particular message? Otherwise, looking at that code, I can't see how ack could be called more times than there are messages.

By the way, since #ack acknowledges the least recent message, if processFn calls its callback asynchronously, you could ack the wrong messages. It might be better to do it this way (sorry, JS syntax):

// ...
worker.connect(provider);
function doJob() {
  var job = worker.read();
  if (!job) worker.once('readable', doJob);
  else {
    processFn(JSON.parse(job), function(err) {
      if (err) return console.log(err);
      worker.ack();
      doJob();
    });
  }
}

In other words, read the next message after processing the last message. (See #79 about acknowledging messages out of order)

@squaremo , thanks for your reply, It does make sense. the #ack must have been called twice somewhere. I didn't realise that I should check previous closed issues for answers. Many thanks and apologies.