NodeJS 8+
npm i --save @goodware/task-queue
This lightweight, battle-tested, single-dependency (joi) task queue limits the number of tasks (synchronous or asynchronous) that execute concurrently. The purpose of limiting task execution is to control resource usage such as memory and database connections.
Although several packages address this use case, this is apparently the only library that can queue tasks post-instantiation without using generators. The API is, most of all, easy to learn and use.
A task-queue object is instantiated by providing a configuration object to the constructor. The configuration object currently has one required and one optional property:
Name | Description |
---|---|
size |
The size of the queue |
workers |
The number of tasks that can execute simultaneously |
size can be provided without workers. workers can provided without size.
Functions are queued via the asynchronous method push(task)
. This method accepts a function named task
and returns a Promise that resolves to an object when the task
function is called (not when task
returns). task
is called only when a worker is available. task
does not need to return a Promise, but if it does, it can be acquired via the promise
property of the object returned by push().
- Create a queue that runs at most 10 running tasks
new (require('@goodware/task-queue'))({ size: 10 });
- Wait for the provided function to be invoked:
await queue.push(() => {...})
- Wait for the provided function to finish:
await (await queue.push(() => {...})).promise;
This example runs at most two tasks at a time. It outputs: 2, 1, 4, 3.
const queue = new (require('@goodware/task-queue'))({ size: 2 });
// Task #1 : await push() returns immediately because the queue is empty. 'await'
// doesn't wait for the task to complete.
await queue.push(
() =>
new Promise((resolve) =>
setTimeout(() => {
console.log(`Task 1 ${Date.now()}`);
resolve();
}, 400)
)
);
// Task #2 : await push() returns immediately because the queue has an open slot
await queue.push(
() =>
new Promise((resolve) =>
setTimeout(() => {
console.log(`Task 2 ${Date.now()}`);
resolve();
}, 300)
)
);
// The queue is full. Task #2 will finish in about 300 ms.
// Task #3 : await push() waits until task #2 finishes
await queue.push(
() =>
new Promise((resolve) =>
setTimeout(() => {
console.log(`Task 3 ${Date.now()}`);
resolve();
}, 200)
)
);
// The queue is full again. 300 ms have already passed. Task #1 will
// terminate in about 100 ms, leaving task #3 in the queue.
// Task #4 : await push() waits until task #1 finishes
const ret = await queue.push(
() =>
new Promise((resolve) =>
setTimeout(() => {
console.log(`Task 4 ${Date.now()}`);
resolve();
}, 100)
)
);
// Wait for task #4 to finish
await ret.promise;
await queue.stop();
push()
returns a new Promise each time it is called, thus consuming memory. Depending on your application, it may be necessary to limit calls to push()
when the queue is full if you are unable to control the number of calls to push().
For example, consider the following constraints:
- Up to 10 workers can execute at the same time
- When 10 workers are running, up to 50 tasks can call
push()
and immediately continue their work. Subsequent callers will wait until a worker has finished.
Although it appears that resources are properly constrained in this scenario, if push()
is called, say, 1,000 times a second, and the workers take longer than 1 second each, the process will likely run out of memory. One solution to this scenario is backpressure.
No form of backpressure is a silver bullet. External systems must handle errors and retry.
const queue = new (require('@goodware/task-queue'))({ size: 50, workers: 10 });
async function doWork() {
const me = Date.now();
console.log(`${me} begin`);
await new Promise((resolve) => setTimeout(resolve, 200));
console.log(`${me} end`);
}
for (let i = 1; i <= 100; ++i) {
// The most basic implementation of backpressure: wait 50 ms
if (queue.full) {
console.log('full');
await new Promise((resolve)=>setTimeout(resolve, 50));
}
else {
await queue.push(doWork);
console.log('queued');
}
}
await queue.stop();