squaremo/rabbit.js

Ability to ACK a single message

Opened this issue · 4 comments

When using a PUSH/WORKER scenario, if you receive a message on the Worker part, you can ack the worker socket to tell RabbitMQ that a message has been processed and can receive the next batch.

The pitfall is that if you configure the worker with a prefetch bigger than 1, you will always ack the first message received (that is saved always on the first position of the unacked array)

But, in a async environment, you may finish processing the second message received before the first one, but the ack will make the first one to be acked, not the message actually finished.

I would love to have a extended ack method that can accept the message payload (or a uniqueId) for acking that single message.

@squaremo , what do you think?

Right, the WORKER socket assumes you're doing just one thing at a time, and scaling up by having more workers.

This discussion may be apposite: #48

The solution discussed there is to construct a future for each task, and pass that along with the payload to the next stage. Then you can iterate through the promises and ack things in order.

I would love to have a extended ack method that can accept the message payload (or a uniqueId) for acking that single message.

Both of these are problematic: the first assumes no two payloads will ever be the same (uh-oh); the second requires some means of giving a buffer a unique ID, which is then required when acking it. I suppose it could be a property set on the buffer, but that seems kind of gross -- also, whatever you're passing it on to would have to know to tell you once it's done -- so you may as well give it a future or callback or whatever, rather than an ID.

@squaremo I think messages received from your lib to Rabbit.JS internals has a deliveryTag (that is what you are using to ACK messages, based on https://github.com/squaremo/amqp.node/blob/master/lib/channel_model.js#L220-224 )

I think that you can send, along the message, the deliveryTag that message has. As I see [1] the readable.push method, I think this change will break the current API, as it allows only a Buffer or a String, and not an object, or two parameters.

[1] http://nodejs.org/api/stream.html#stream_readable_push_chunk_encoding

I think that you can send, along the message, the deliveryTag that message has. As I see [1] the readable.push method, I think this change will break the current API, as it allows only a Buffer or a String

One can always set a property on the buffer or string -- it's JavaScript after all. But I don't like this solution, because the receiver has to know a very specific behaviour, which is to get the delivery tag from the buffer it's responding to, and somehow attach that to its answer. I'd rather there was a callback.

The library could help you out: maybe something like

var worker = rabbit.socker('WORKER');
worker.connect('jobs');
worker.on('data', function(job) {
  doSomethingAsync(job, worker.lastMessageAcker());
});