This turns an async function into a parallel transform for require('stream').pipeline in Node.
- Accepts async functions
- Fixes mafintosh/parallel-transform/issues/4 to work with
require('stream').pipeline
- TypeScript Definition (with the pure TypeScript reimplementation)
- Add tests
- A few utility functions
- The blog post
npm install pipeline-pipe
Example usage:
// Example to scrape HTML and store titles of them in DB:
const {pipeline, Readable} = require('stream');
const pipe = require('pipeline-pipe');
pipeline(
Readable.from([1, 2, 3]),
// Request HTML asynchronously in 16 parallel
pipe(async postId => {
const json = await getPost(postId);
return json;
}, 16),
// Synchronous transformation as Array.prototype.map
pipe(json => parseHTML(json.postBody).document.title),
// Synchronous transformation as Array.prototype.filter
pipe(title => title.includes('important') ? title : null),
// Asynchronous in 4 parallel
pipe(async title => {
const result = await storeInDB(title), 4);
console.info(result);
}, 4)
(err) => console.info('All done!')
);
Types:
import { Transform, TransformOptions } from 'stream';
type ParallelTransformOpitons =
| number
| TransformOptions & { maxParallel?: number, ordered?: boolean };
export default function pipe(
fn: (data: any) => Promise<any> | any,
opts?: ParallelTransformOptions,
): Transform;
Option property | Default value | description |
---|---|---|
maxParallel |
10 |
Number of maximum parallel executions. |
ordered |
true |
Preserving order of streaming chunks. |
A number can be passed to opts
. pipe(fn, 20)
is same as pipe(fn, {maxParallel: 20})
.
Just a promisified version of require('stream').pipeline
. It requires Node v10+. Equivalent to:
const {promisify} = require('util');
const {pipeline: _pipeline} = require('stream');
const pipeline = promisify(_pipeline);
Example:
const {pipeline, pipe} = require('pipeline-pipe');
await pipeline(
readable,
pipe(chunk => chunk.replace('a', 'z')),
pipe(chunk => storeInDB(chunk)),
);
console.log('All done!');
It concatenates sequential data to be specified size of array. This is useful when you post array data at once in the way that Elasticsearch Bulk API does.
Example:
const {pipeline} = require('stream');
const {concat, pipe} = require('pipeline-pipe');
pipeline(
Readable.from([1, 2, 3, 4, 5]),
concat(2),
pipe(console.log), // [ 1, 2 ]
// [ 3, 4 ]
// [ 5 ]
(err) => console.info('All done!'),
);
Creates a Transform
to split incoming Array
chunk into pieces to subsequent streams.
const {pipeline} = require('stream');
const {split, pipe} = require('pipeline-pipe');
pipeline(
Readable.from([1, 2, 3]),
pipe(page => getPostsByPage(page)),
pipe(json => json.posts), // Returns an array of posts
pipe(split()), // Splits the array into each posts
pipe(post => storeInDB(post.title)), // Now the argument is a post
(err) => console.info('All done!')
);
MIT