An async queue.
- Multiple/Managable queues
npm i -S multi-queue
var mqueue = require('multi-queue');
var mq = mqueue();
// Request pages 1 - 5,
// 3 pages at a time
// disallow duplicate requests per unique, page 2 will NOT run twice
mq.create('tweets', { concurrency: 3 })
[1, 2, 2, 3, 4, 5].forEach(function(page) {
mq.push('tweets', function(done) {
twitter.fetchPage(page, function(err, tweets) {
// Do some stuff with tweets
done(err); // Must call done to inform mq that task is complete
});
}, { unique: 'fetch' + page });
})
var mqueue = require('multi-queue');
var mq = mqueue(); // Note: Exported is a function to be called, not a class
Creates a new queue named by key
.
Default: 1 (aka no concurrency)
Executes N tasks specified by concurrency
.
var mq = mqueue();
mq.create('key', { concurrency: 2 });
mq.push('key', getTweets);
mq.push('key', getTweets);
mq.push('key', getTweets); // queued
Add a task to a queue(key
).
task
is expected to be a function that receives a single callback argument
which must be called to continue processing tasks.
If no key is provided then the task is added to the default queue.
var mq = mqueue();
// add task to the `repos` queue
mq.push('repos', function(done) {
github.getRepos(function (err, repos) {
// do some stuff
done(); // Must call done to inform mq that the task is complete
});
});
// Add to a queue uniquely
mq.push(function(done) {
github.getRepos(function (err, repos) {
// do some stuff
done(); // Must call done to inform mq that the task is complete
}, { unique: 'github' });
});
// IGNORED
mq.push(function(done) {
github.getRepos(function (err, repos) {
// do some stuff
done(); // Must call done to inform mq that the task is complete
}, { unique: 'github' });
});
By specifying a name for a task you gain the ability to call the following methods:
- remove
This option guarantees your tasks are unique.
var mq = mqueue();
mq.push(getTweets, { unique: 'tweets' });
mq.push(getTweets, { name: 'tweets', unique: true }); // equivalent to the above
mq.push(getTweetsAgain, { unique: 'tweets' }); // Will not be added to queue
mq.push('myKey', getTweetsAgain, { unique: 'tweets' }); // Added to the myKey queue
Note: If a
string
value is set tounique
then that will act as a name and unique will be set totrue
. In otherwords,{unique: 'tweets'}
is equivalent to{ name: 'tweets', unique: true }
.
Provide meta data to get insight into tasks when listening to events.
mq.on('run', function(name, taskName, meta) {
console.log(meta.id); // 5;
});
mq.push(uploadData, { meta: { id: 5 } });
Start a queue specified by key
.
If
key
is omitted then the default queue is started
Stop a queue specified by key
.
If
key
is omitted then the default queue is stopped
Remove all tasks from the queue specified by key
.
If
key
is omitted then the default queue is emptied
Remove a task(name
) from the queue(key
).
If
key
is omitted then the default queue is used
In order to remove a task then it must be named when it is added via queue.push.
MultiQueue
extends EventEmitter
and emits the following events:
Called when start
is called
mq.on('start', function(name) {
console.log(name); // foo
})
mq.start('foo');
Called when stop
is called
mq.on('stop', function(name) {
console.log(name); // foo
})
mq.stop('foo');
Called when a queue is emptied
mq.on('empty', function(name) {
console.log(name); // name of emptied queue
});
mq.empty('foo');
Called when a task is added and queued (can't run immediately)
mq.on('queue', function(name, taskName, meta) {
console.log(name); // name of queue added to 'foo'
console.log(taskName); // name of task 'baz'. `bar` was not queued
});
mq.push('foo', someFn, { name: 'bar', concurrency: 1 });
mq.push('foo', someFn, { name: 'baz' });
Called when a task is executed
mq.on('run', function(name, taskName, meta) {
console.log(name);
console.log(taskName);
});
mq.push('foo', someFn, { name: 'bar' });
Called when a task is executed
// called once in this scenario
mq.on('duplicate', function(name, taskName, meta) {
console.log(name); // 'foo'
console.log(taskName); 'bar'
});
mq.push('foo', someFn, { name: 'bar' });
mq.push('foo', someFn, { name: 'bar' });
Called when a task is completed
Pass values into
done
in order to gain some insight into the task
// Expose an error
function someFn(done) {
done(new Error('test'));
}
mq.on('done', function(err, name, taskName, meta) {
console.log(err.message); // 'test'
console.log(name);
console.log(taskName);
})
mq.push('foo', someFn, { name: 'bar' });
// Expose values
function someFn(done) {
done(null, 'hi');
}
mq.on('done', function(err, val, name, taskName, meta) {
console.log(err); // null
console.log(val); // 'hi'
console.log(name);
console.log(taskName);
})
mq.push('foo', someFn, { name: 'bar' });
- Timeout tasks
- Active concurrency (When a task with a differenct concurrency is added to the queue update value)