sindresorhus/p-queue

This documentation looks very legit. It's AI generated from the tests, and some prompting.....

Opened this issue · 1 comments

PQueue Guide


1. Introduction to PQueue

Overview

PQueue is a powerful promise queue library that provides concurrency control for managing asynchronous tasks in JavaScript. It allows developers to:

  • Limit the number of concurrent tasks.
  • Throttle tasks over specified intervals.
  • Prioritize tasks based on importance.
  • Cancel tasks using AbortController.

Benefits:

  • Resource Management: Prevents overwhelming systems by controlling task execution rates.
  • Flexibility: Supports dynamic configuration changes at runtime.
  • Error Handling: Offers robust mechanisms for managing task errors and timeouts.

Use Cases:

  • API Rate Limiting: Complying with external API limits by controlling request rates.
  • Batch Processing: Managing large sets of data processing tasks efficiently.
  • Task Scheduling: Organizing tasks based on priority and available resources.

Getting Started

Installation

Install PQueue using npm:

npm install p-queue

Or using yarn:

yarn add p-queue

Basic Example

Create a queue and add tasks:

import PQueue from 'p-queue';

const queue = new PQueue();

const task = async () => {
  // Your asynchronous operation
  return 'Task result';
};

queue.add(task).then(result => {
  console.log(result); // Output: 'Task result'
});

In this example, a new PQueue instance is created, and an asynchronous task is added to the queue using .add(). The task executes immediately since the default concurrency is unlimited.


2. Core Functionality

Task Management

Adding Tasks with .add()

Use .add() to include a single task in the queue. The task can be a function returning a promise (asynchronous) or a synchronous value.

queue.add(async () => {
  // Asynchronous task
  await performAsyncOperation();
  return 'Async result';
});

queue.add(() => {
  // Synchronous task
  return 'Sync result';
});

Adding Multiple Tasks with .addAll()

To add multiple tasks simultaneously, use .addAll() with an array of task functions.

const tasks = [
  async () => {
    await performAsyncOperation();
    return 'Task 1';
  },
  () => 'Task 2',
];

queue.addAll(tasks).then(results => {
  console.log(results); // Output: ['Task 1', 'Task 2']
});

Supported Task Types

  • Asynchronous Tasks: Functions that return a promise.
  • Synchronous Tasks: Functions that return a value immediately.

Concurrency and Throttling

Managing Concurrency

Set the concurrency option to limit the number of tasks running concurrently.

const queue = new PQueue({ concurrency: 2 });

queue.add(async () => {
  // Task 1
});

queue.add(async () => {
  // Task 2
});

queue.add(async () => {
  // Task 3
});

console.log(queue.size);    // Output: 1 (queued tasks)
console.log(queue.pending); // Output: 2 (running tasks)

Example from Tests:

const queue = new PQueue({ concurrency: 2 });
queue.add(async () => 'Result 1');
queue.add(async () => {
  await delay(100);
  return 'Result 2';
});
queue.add(async () => 'Result 3');

console.log(queue.size);    // Output: 1
console.log(queue.pending); // Output: 2

Throttling with interval and intervalCap

  • intervalCap: Maximum number of tasks to execute per interval.
  • interval: Time frame for the interval in milliseconds.
const queue = new PQueue({
  intervalCap: 5, // Max 5 tasks
  interval: 1000, // Per 1000 milliseconds
});

Example from Tests:

const queue = new PQueue({
  concurrency: 5,
  intervalCap: 10,
  interval: 1000,
});

for (let i = 0; i < 13; i++) {
  queue.add(async () => {
    await delay(300);
    console.log(`Task ${i} completed`);
  });
}

Managing Unfinished Tasks with carryoverConcurrencyCount

When carryoverConcurrencyCount is true, unfinished tasks are counted towards the intervalCap in the next interval.

const queue = new PQueue({
  intervalCap: 1,
  interval: 500,
  carryoverConcurrencyCount: true,
});

queue.add(async () => {
  await delay(600); // Exceeds the interval of 500ms
  console.log('Task completed');
});

3. Execution Flow Control

Prioritization

Assign priorities to tasks to control their execution order. Higher priority tasks run before lower priority ones.

queue.add(() => console.log('Task 1'), { priority: 1 });
queue.add(() => console.log('Task 2'), { priority: 0 });
queue.add(() => console.log('Task 3'), { priority: 2 });

Example from Tests:

const result = [];
const queue = new PQueue({ concurrency: 1 });

queue.add(() => result.push('A'), { priority: 1 });
queue.add(() => result.push('B'), { priority: 0 });
queue.add(() => result.push('C'), { priority: 1 });
queue.add(() => result.push('D'), { priority: 2 });

queue.onIdle().then(() => {
  console.log(result); // Output: ['D', 'A', 'C', 'B']
});

Auto-Start

By default, the queue starts processing tasks immediately. Set autoStart to false to manually control task execution.

const queue = new PQueue({ autoStart: false });

queue.add(() => 'Task');
console.log(queue.isPaused); // Output: true

queue.start();
console.log(queue.isPaused); // Output: false

Example from Tests:

const queue = new PQueue({ concurrency: 2, autoStart: false });

queue.add(() => delay(1000));
console.log(queue.size);     // Output: 1
console.log(queue.pending);  // Output: 0
console.log(queue.isPaused); // Output: true

queue.start();
console.log(queue.isPaused); // Output: false

Pause and Resume

Use .pause() to stop task execution and .start() to resume it.

queue.pause();
queue.add(() => 'Task');
console.log(queue.isPaused); // Output: true

queue.start();
console.log(queue.isPaused); // Output: false

Example from Tests:

const queue = new PQueue({ concurrency: 2 });

queue.pause();
queue.add(() => delay(1000));
queue.add(() => delay(1000));

console.log(queue.size);     // Output: 2
console.log(queue.pending);  // Output: 0
console.log(queue.isPaused); // Output: true

queue.start();
console.log(queue.isPaused); // Output: false

Timeouts

Set global or per-task timeouts to prevent tasks from running indefinitely.

  • timeout: Maximum duration (in milliseconds) a task can run.
  • throwOnTimeout: Determines if a timeout should result in an error.
const queue = new PQueue({ timeout: 300, throwOnTimeout: true });

queue.add(async () => {
  await delay(400); // Exceeds the timeout
});
// This task will throw a timeout error.

Example from Tests:

const queue = new PQueue({ timeout: 300, throwOnTimeout: true });

queue.add(async () => {
  await delay(400);
}).catch(error => {
  console.error(error.message); // Output: 'Promise timed out after 300 milliseconds'
});

4. Queue State and Monitoring

Status Tracking

Monitor the queue's state using properties and methods:

  • size: Number of tasks waiting in the queue.
  • pending: Number of tasks currently running.
console.log(queue.size);    // Output: Number of queued tasks
console.log(queue.pending); // Output: Number of running tasks

Using .onEmpty()

Resolves when the queue has no tasks waiting to be processed.

queue.onEmpty().then(() => {
  console.log('Queue is empty');
});

Using .onIdle()

Resolves when all tasks have completed (both pending and queued).

queue.onIdle().then(() => {
  console.log('Queue is idle');
});

Example from Tests:

const queue = new PQueue({ concurrency: 2 });

queue.add(() => delay(100));
queue.add(() => delay(100));
queue.add(() => delay(100));

console.log(queue.size);    // Output: 1
console.log(queue.pending); // Output: 2

queue.onIdle().then(() => {
  console.log('All tasks completed');
});

Clearing the Queue

Use .clear() to remove all pending tasks from the queue.

queue.add(() => delay(1000));
queue.add(() => delay(1000));

console.log(queue.size); // Output: 2

queue.clear();

console.log(queue.size); // Output: 0

Example from Tests:

const queue = new PQueue({ concurrency: 2 });

queue.add(() => delay(20000));
queue.add(() => delay(20000));
queue.add(() => delay(20000));

console.log(queue.size);    // Output: 1
console.log(queue.pending); // Output: 2

queue.clear();

console.log(queue.size); // Output: 0

5. Advanced Features

Abort and Cancellation

Use AbortController and AbortSignal to cancel tasks.

const controller = new AbortController();

queue.add(async ({ signal }) => {
  signal.addEventListener('abort', () => {
    console.log('Task aborted');
  });

  await performAsyncOperation();
}, { signal: controller.signal });

// To abort the task:
controller.abort();

Example from Tests:

const queue = new PQueue();
const controller = new AbortController();

controller.abort();

queue.add(() => 'Task', { signal: controller.signal }).catch(error => {
  console.error(error.name); // Output: 'AbortError'
});

Dynamic Configuration Updates

Modify options like concurrency, timeout, and throttling settings at runtime.

queue.concurrency = 5;
queue.timeout = 500;
queue.interval = 1000;
queue.intervalCap = 10;

Example from Tests:

let concurrency = 5;
const queue = new PQueue({ concurrency });

for (let i = 0; i < 100; i++) {
  queue.add(async () => {
    // Task logic

    if (i % 30 === 0) {
      queue.concurrency = --concurrency;
      console.log(`Concurrency updated to ${queue.concurrency}`);
    }
  });
}

Mixed Task Handling

Combine synchronous and asynchronous tasks within the same queue seamlessly.

queue.add(() => 'Synchronous Task');
queue.add(async () => {
  await performAsyncOperation();
  return 'Asynchronous Task';
});

Example from Tests:

const queue = new PQueue({ concurrency: 1 });

queue.add(() => 'sync 1');
queue.add(async () => delay(1000));
queue.add(() => 'sync 2');
queue.add(() => 'sync 3');

queue.onIdle().then(() => {
  console.log('All tasks completed');
});

6. Error Management

Error Handling in Tasks

Errors thrown within tasks will cause the task's promise to be rejected.

queue.add(async () => {
  throw new Error('Task failed');
}).catch(error => {
  console.error('Task error:', error.message);
});

Example from Tests:

const queue = new PQueue({ concurrency: 1 });

queue.add(async () => {
  throw new Error('broken');
}).catch(error => {
  console.error(error.message); // Output: 'broken'
});

Error Event Emission

Listen for error events to manage task failures effectively.

queue.on('error', error => {
  console.error('Error event:', error.message);
});

queue.add(async () => {
  throw new Error('Task failed');
});

Example from Tests:

const queue = new PQueue({ concurrency: 1 });

queue.on('error', error => {
  console.error('Error event:', error.message);
});

queue.add(async () => {
  throw new Error('failure');
});

7. Event-Driven Architecture

Key Events

  • add: Emitted when a task is added to the queue.
  • active: Emitted when a task starts running.
  • next: Emitted after a task completes.
  • empty: Emitted when the queue becomes empty (no queued tasks).
  • idle: Emitted when all tasks have completed (no running or queued tasks).
  • completed: Emitted when a task finishes successfully.
  • error: Emitted when a task fails.

Subscribing to Events

queue.on('add', () => {
  console.log('Task added to the queue');
});

queue.on('active', () => {
  console.log('Task started');
});

queue.on('completed', result => {
  console.log('Task completed with result:', result);
});

queue.on('error', error => {
  console.error('Task failed with error:', error.message);
});

queue.on('idle', () => {
  console.log('All tasks have completed');
});

Practical Event Usage

Example from Tests:

const items = [1, 2, 3];
const queue = new PQueue();

let activeCount = 0;
queue.on('active', () => {
  activeCount++;
  console.log(`Active tasks: ${activeCount}`);
});

items.forEach(item => {
  queue.add(() => item);
});

queue.onIdle().then(() => {
  console.log('All tasks completed');
});

8. Configuration and Validation

Validating Configuration Options

Ensure that configuration options are set correctly to avoid runtime errors.

  • concurrency: Must be a positive number greater than zero.
  • interval: Must be a finite non-negative number.
  • intervalCap: Must be a positive number.
try {
  const queue = new PQueue({ concurrency: 0 });
} catch (error) {
  console.error(error.message); // Output: 'Expected `concurrency` to be a number from 1 and up'
}

Example from Tests:

try {
  new PQueue({ interval: -1 });
} catch (error) {
  console.error(error.message); // Output: 'Expected `interval` to be a finite number >= 0'
}

Avoiding Common Pitfalls

  • Invalid Option Values: Ensure concurrency, interval, and intervalCap are set to valid numbers.
  • Unhandled Rejections: Always handle errors in tasks to prevent unhandled promise rejections.
  • Infinite Loops: Avoid long-running synchronous tasks that block the event loop.

9. Examples and Best Practices

Common Use Cases

API Rate Limiting

Control the rate of API calls to comply with external API limits.

const queue = new PQueue({
  interval: 1000,     // 1 second interval
  intervalCap: 5,     // Max 5 API calls per interval
});

const fetchData = async url => {
  const response = await fetch(url);
  return response.json();
};

const urls = ['https://api.example.com/data1', 'https://api.example.com/data2'];

urls.forEach(url => {
  queue.add(() => fetchData(url)).then(data => {
    console.log(data);
  });
});

Batch Processing

Efficiently process large data sets with controlled concurrency.

const queue = new PQueue({ concurrency: 3 });

const processItem = async item => {
  // Processing logic
};

const items = [/* Large array of items */];

items.forEach(item => {
  queue.add(() => processItem(item));
});

Best Practices

  • Handle Errors Properly: Use .catch() or try...catch to manage errors in tasks.
  • Monitor Queue State: Utilize events like idle and empty to track the queue.
  • Optimize Concurrency: Adjust concurrency based on system capabilities and task nature.
  • Avoid Blocking Operations: Ensure tasks are non-blocking to keep the event loop responsive.
  • Dynamic Adjustments: Modify queue configurations at runtime to adapt to changing conditions.

10. Appendix

Reference

For detailed information on all methods and options, refer to the [PQueue documentation](https://github.com/sindresorhus/p-queue).

Edge Cases

  • Empty Queues: Calling .onEmpty() or .onIdle() when the queue is already empty resolves immediately.
  • Task Rejection: Rejections in tasks should be handled to prevent unhandled promise rejections.

Troubleshooting

Common Issues

  • Tasks Not Executing: Ensure autoStart is true or call queue.start().
  • Exceeding Rate Limits: Adjust interval and intervalCap to match desired throughput.
  • Unhandled Rejections: Always handle rejections in task promises.

FAQs

  • Can I Change Concurrency During Execution?

    • Answer: Yes, you can update queue.concurrency at any time.
  • How Do I Cancel a Task?

    • Answer: Use an AbortController and pass its signal to the task.
  • What Happens When a Task Times Out?

    • Answer: If throwOnTimeout is true, the task will reject with a timeout error.

This guide provides a comprehensive overview of PQueue, illustrating its features with practical code examples. By leveraging these functionalities, you can efficiently manage asynchronous tasks with precise control over execution flow, concurrency, and error handling.

@arturohernandez10

The documentation is very well written and solved my problem.

I have a question now: What is the difference between these 2 code examples?

To be more precise, I don't understand the difference between intervalCap and concurrency.

Thanks.

const queue = new PQueue({
  interval: 1000,     // 1 second interval
  intervalCap: 5,     // Max 5 API calls per interval
});
const queue = new PQueue({
  concurrency:5,    // concurrency is 5
  interval: 1000,     // 1 second interval
});