This documentation looks very legit. It's AI generated from the tests, and some prompting.....
Opened this issue · 1 comments
PQueue Guide
1. Introduction to PQueue
Overview
PQueue
is a powerful promise queue library that provides concurrency control for managing asynchronous tasks in JavaScript. It allows developers to:
- Limit the number of concurrent tasks.
- Throttle tasks over specified intervals.
- Prioritize tasks based on importance.
- Cancel tasks using
AbortController
.
Benefits:
- Resource Management: Prevents overwhelming systems by controlling task execution rates.
- Flexibility: Supports dynamic configuration changes at runtime.
- Error Handling: Offers robust mechanisms for managing task errors and timeouts.
Use Cases:
- API Rate Limiting: Complying with external API limits by controlling request rates.
- Batch Processing: Managing large sets of data processing tasks efficiently.
- Task Scheduling: Organizing tasks based on priority and available resources.
Getting Started
Installation
Install PQueue
using npm:
npm install p-queue
Or using yarn:
yarn add p-queue
Basic Example
Create a queue and add tasks:
import PQueue from 'p-queue';
const queue = new PQueue();
const task = async () => {
// Your asynchronous operation
return 'Task result';
};
queue.add(task).then(result => {
console.log(result); // Output: 'Task result'
});
In this example, a new PQueue
instance is created, and an asynchronous task is added to the queue using .add()
. The task executes immediately since the default concurrency is unlimited.
2. Core Functionality
Task Management
Adding Tasks with .add()
Use .add()
to include a single task in the queue. The task can be a function returning a promise (asynchronous) or a synchronous value.
queue.add(async () => {
// Asynchronous task
await performAsyncOperation();
return 'Async result';
});
queue.add(() => {
// Synchronous task
return 'Sync result';
});
Adding Multiple Tasks with .addAll()
To add multiple tasks simultaneously, use .addAll()
with an array of task functions.
const tasks = [
async () => {
await performAsyncOperation();
return 'Task 1';
},
() => 'Task 2',
];
queue.addAll(tasks).then(results => {
console.log(results); // Output: ['Task 1', 'Task 2']
});
Supported Task Types
- Asynchronous Tasks: Functions that return a promise.
- Synchronous Tasks: Functions that return a value immediately.
Concurrency and Throttling
Managing Concurrency
Set the concurrency
option to limit the number of tasks running concurrently.
const queue = new PQueue({ concurrency: 2 });
queue.add(async () => {
// Task 1
});
queue.add(async () => {
// Task 2
});
queue.add(async () => {
// Task 3
});
console.log(queue.size); // Output: 1 (queued tasks)
console.log(queue.pending); // Output: 2 (running tasks)
Example from Tests:
const queue = new PQueue({ concurrency: 2 });
queue.add(async () => 'Result 1');
queue.add(async () => {
await delay(100);
return 'Result 2';
});
queue.add(async () => 'Result 3');
console.log(queue.size); // Output: 1
console.log(queue.pending); // Output: 2
Throttling with interval
and intervalCap
intervalCap
: Maximum number of tasks to execute per interval.interval
: Time frame for the interval in milliseconds.
const queue = new PQueue({
intervalCap: 5, // Max 5 tasks
interval: 1000, // Per 1000 milliseconds
});
Example from Tests:
const queue = new PQueue({
concurrency: 5,
intervalCap: 10,
interval: 1000,
});
for (let i = 0; i < 13; i++) {
queue.add(async () => {
await delay(300);
console.log(`Task ${i} completed`);
});
}
Managing Unfinished Tasks with carryoverConcurrencyCount
When carryoverConcurrencyCount
is true
, unfinished tasks are counted towards the intervalCap
in the next interval.
const queue = new PQueue({
intervalCap: 1,
interval: 500,
carryoverConcurrencyCount: true,
});
queue.add(async () => {
await delay(600); // Exceeds the interval of 500ms
console.log('Task completed');
});
3. Execution Flow Control
Prioritization
Assign priorities to tasks to control their execution order. Higher priority tasks run before lower priority ones.
queue.add(() => console.log('Task 1'), { priority: 1 });
queue.add(() => console.log('Task 2'), { priority: 0 });
queue.add(() => console.log('Task 3'), { priority: 2 });
Example from Tests:
const result = [];
const queue = new PQueue({ concurrency: 1 });
queue.add(() => result.push('A'), { priority: 1 });
queue.add(() => result.push('B'), { priority: 0 });
queue.add(() => result.push('C'), { priority: 1 });
queue.add(() => result.push('D'), { priority: 2 });
queue.onIdle().then(() => {
console.log(result); // Output: ['D', 'A', 'C', 'B']
});
Auto-Start
By default, the queue starts processing tasks immediately. Set autoStart
to false
to manually control task execution.
const queue = new PQueue({ autoStart: false });
queue.add(() => 'Task');
console.log(queue.isPaused); // Output: true
queue.start();
console.log(queue.isPaused); // Output: false
Example from Tests:
const queue = new PQueue({ concurrency: 2, autoStart: false });
queue.add(() => delay(1000));
console.log(queue.size); // Output: 1
console.log(queue.pending); // Output: 0
console.log(queue.isPaused); // Output: true
queue.start();
console.log(queue.isPaused); // Output: false
Pause and Resume
Use .pause()
to stop task execution and .start()
to resume it.
queue.pause();
queue.add(() => 'Task');
console.log(queue.isPaused); // Output: true
queue.start();
console.log(queue.isPaused); // Output: false
Example from Tests:
const queue = new PQueue({ concurrency: 2 });
queue.pause();
queue.add(() => delay(1000));
queue.add(() => delay(1000));
console.log(queue.size); // Output: 2
console.log(queue.pending); // Output: 0
console.log(queue.isPaused); // Output: true
queue.start();
console.log(queue.isPaused); // Output: false
Timeouts
Set global or per-task timeouts to prevent tasks from running indefinitely.
timeout
: Maximum duration (in milliseconds) a task can run.throwOnTimeout
: Determines if a timeout should result in an error.
const queue = new PQueue({ timeout: 300, throwOnTimeout: true });
queue.add(async () => {
await delay(400); // Exceeds the timeout
});
// This task will throw a timeout error.
Example from Tests:
const queue = new PQueue({ timeout: 300, throwOnTimeout: true });
queue.add(async () => {
await delay(400);
}).catch(error => {
console.error(error.message); // Output: 'Promise timed out after 300 milliseconds'
});
4. Queue State and Monitoring
Status Tracking
Monitor the queue's state using properties and methods:
size
: Number of tasks waiting in the queue.pending
: Number of tasks currently running.
console.log(queue.size); // Output: Number of queued tasks
console.log(queue.pending); // Output: Number of running tasks
Using .onEmpty()
Resolves when the queue has no tasks waiting to be processed.
queue.onEmpty().then(() => {
console.log('Queue is empty');
});
Using .onIdle()
Resolves when all tasks have completed (both pending and queued).
queue.onIdle().then(() => {
console.log('Queue is idle');
});
Example from Tests:
const queue = new PQueue({ concurrency: 2 });
queue.add(() => delay(100));
queue.add(() => delay(100));
queue.add(() => delay(100));
console.log(queue.size); // Output: 1
console.log(queue.pending); // Output: 2
queue.onIdle().then(() => {
console.log('All tasks completed');
});
Clearing the Queue
Use .clear()
to remove all pending tasks from the queue.
queue.add(() => delay(1000));
queue.add(() => delay(1000));
console.log(queue.size); // Output: 2
queue.clear();
console.log(queue.size); // Output: 0
Example from Tests:
const queue = new PQueue({ concurrency: 2 });
queue.add(() => delay(20000));
queue.add(() => delay(20000));
queue.add(() => delay(20000));
console.log(queue.size); // Output: 1
console.log(queue.pending); // Output: 2
queue.clear();
console.log(queue.size); // Output: 0
5. Advanced Features
Abort and Cancellation
Use AbortController
and AbortSignal
to cancel tasks.
const controller = new AbortController();
queue.add(async ({ signal }) => {
signal.addEventListener('abort', () => {
console.log('Task aborted');
});
await performAsyncOperation();
}, { signal: controller.signal });
// To abort the task:
controller.abort();
Example from Tests:
const queue = new PQueue();
const controller = new AbortController();
controller.abort();
queue.add(() => 'Task', { signal: controller.signal }).catch(error => {
console.error(error.name); // Output: 'AbortError'
});
Dynamic Configuration Updates
Modify options like concurrency
, timeout
, and throttling settings at runtime.
queue.concurrency = 5;
queue.timeout = 500;
queue.interval = 1000;
queue.intervalCap = 10;
Example from Tests:
let concurrency = 5;
const queue = new PQueue({ concurrency });
for (let i = 0; i < 100; i++) {
queue.add(async () => {
// Task logic
if (i % 30 === 0) {
queue.concurrency = --concurrency;
console.log(`Concurrency updated to ${queue.concurrency}`);
}
});
}
Mixed Task Handling
Combine synchronous and asynchronous tasks within the same queue seamlessly.
queue.add(() => 'Synchronous Task');
queue.add(async () => {
await performAsyncOperation();
return 'Asynchronous Task';
});
Example from Tests:
const queue = new PQueue({ concurrency: 1 });
queue.add(() => 'sync 1');
queue.add(async () => delay(1000));
queue.add(() => 'sync 2');
queue.add(() => 'sync 3');
queue.onIdle().then(() => {
console.log('All tasks completed');
});
6. Error Management
Error Handling in Tasks
Errors thrown within tasks will cause the task's promise to be rejected.
queue.add(async () => {
throw new Error('Task failed');
}).catch(error => {
console.error('Task error:', error.message);
});
Example from Tests:
const queue = new PQueue({ concurrency: 1 });
queue.add(async () => {
throw new Error('broken');
}).catch(error => {
console.error(error.message); // Output: 'broken'
});
Error Event Emission
Listen for error
events to manage task failures effectively.
queue.on('error', error => {
console.error('Error event:', error.message);
});
queue.add(async () => {
throw new Error('Task failed');
});
Example from Tests:
const queue = new PQueue({ concurrency: 1 });
queue.on('error', error => {
console.error('Error event:', error.message);
});
queue.add(async () => {
throw new Error('failure');
});
7. Event-Driven Architecture
Key Events
add
: Emitted when a task is added to the queue.active
: Emitted when a task starts running.next
: Emitted after a task completes.empty
: Emitted when the queue becomes empty (no queued tasks).idle
: Emitted when all tasks have completed (no running or queued tasks).completed
: Emitted when a task finishes successfully.error
: Emitted when a task fails.
Subscribing to Events
queue.on('add', () => {
console.log('Task added to the queue');
});
queue.on('active', () => {
console.log('Task started');
});
queue.on('completed', result => {
console.log('Task completed with result:', result);
});
queue.on('error', error => {
console.error('Task failed with error:', error.message);
});
queue.on('idle', () => {
console.log('All tasks have completed');
});
Practical Event Usage
Example from Tests:
const items = [1, 2, 3];
const queue = new PQueue();
let activeCount = 0;
queue.on('active', () => {
activeCount++;
console.log(`Active tasks: ${activeCount}`);
});
items.forEach(item => {
queue.add(() => item);
});
queue.onIdle().then(() => {
console.log('All tasks completed');
});
8. Configuration and Validation
Validating Configuration Options
Ensure that configuration options are set correctly to avoid runtime errors.
concurrency
: Must be a positive number greater than zero.interval
: Must be a finite non-negative number.intervalCap
: Must be a positive number.
try {
const queue = new PQueue({ concurrency: 0 });
} catch (error) {
console.error(error.message); // Output: 'Expected `concurrency` to be a number from 1 and up'
}
Example from Tests:
try {
new PQueue({ interval: -1 });
} catch (error) {
console.error(error.message); // Output: 'Expected `interval` to be a finite number >= 0'
}
Avoiding Common Pitfalls
- Invalid Option Values: Ensure
concurrency
,interval
, andintervalCap
are set to valid numbers. - Unhandled Rejections: Always handle errors in tasks to prevent unhandled promise rejections.
- Infinite Loops: Avoid long-running synchronous tasks that block the event loop.
9. Examples and Best Practices
Common Use Cases
API Rate Limiting
Control the rate of API calls to comply with external API limits.
const queue = new PQueue({
interval: 1000, // 1 second interval
intervalCap: 5, // Max 5 API calls per interval
});
const fetchData = async url => {
const response = await fetch(url);
return response.json();
};
const urls = ['https://api.example.com/data1', 'https://api.example.com/data2'];
urls.forEach(url => {
queue.add(() => fetchData(url)).then(data => {
console.log(data);
});
});
Batch Processing
Efficiently process large data sets with controlled concurrency.
const queue = new PQueue({ concurrency: 3 });
const processItem = async item => {
// Processing logic
};
const items = [/* Large array of items */];
items.forEach(item => {
queue.add(() => processItem(item));
});
Best Practices
- Handle Errors Properly: Use
.catch()
ortry...catch
to manage errors in tasks. - Monitor Queue State: Utilize events like
idle
andempty
to track the queue. - Optimize Concurrency: Adjust
concurrency
based on system capabilities and task nature. - Avoid Blocking Operations: Ensure tasks are non-blocking to keep the event loop responsive.
- Dynamic Adjustments: Modify queue configurations at runtime to adapt to changing conditions.
10. Appendix
Reference
For detailed information on all methods and options, refer to the [PQueue documentation](https://github.com/sindresorhus/p-queue).
Edge Cases
- Empty Queues: Calling
.onEmpty()
or.onIdle()
when the queue is already empty resolves immediately. - Task Rejection: Rejections in tasks should be handled to prevent unhandled promise rejections.
Troubleshooting
Common Issues
- Tasks Not Executing: Ensure
autoStart
istrue
or callqueue.start()
. - Exceeding Rate Limits: Adjust
interval
andintervalCap
to match desired throughput. - Unhandled Rejections: Always handle rejections in task promises.
FAQs
-
Can I Change Concurrency During Execution?
- Answer: Yes, you can update
queue.concurrency
at any time.
- Answer: Yes, you can update
-
How Do I Cancel a Task?
- Answer: Use an
AbortController
and pass its signal to the task.
- Answer: Use an
-
What Happens When a Task Times Out?
- Answer: If
throwOnTimeout
istrue
, the task will reject with a timeout error.
- Answer: If
This guide provides a comprehensive overview of PQueue
, illustrating its features with practical code examples. By leveraging these functionalities, you can efficiently manage asynchronous tasks with precise control over execution flow, concurrency, and error handling.
The documentation is very well written and solved my problem.
I have a question now: What is the difference between these 2 code examples?
To be more precise, I don't understand the difference between intervalCap and concurrency.
Thanks.
const queue = new PQueue({
interval: 1000, // 1 second interval
intervalCap: 5, // Max 5 API calls per interval
});
const queue = new PQueue({
concurrency:5, // concurrency is 5
interval: 1000, // 1 second interval
});