/pipeline-pipe

Parallel transform and some utilities for Node Object Stream lovers

Primary LanguageTypeScriptMIT LicenseMIT

pipeline-pipe Node CI npm version downloads

Parallel transform and some utilities for Node Object Stream lovers

Why

Install

npm install pipeline-pipe

pipe(fn, opts)

Example usage:

// Example to scrape HTML and store titles of them in DB:

const { pipeline, Readable } = require('stream');
const pipe = require('pipeline-pipe');

pipeline(
    Readable.from([1, 2, 3]),

    // Request HTML asynchronously in 16 parallel
    pipe(async postId => {
      const json = await getPost(postId);
      return json;
    }, 16),

    // Synchronous transformation as Array.prototype.map
    pipe(json => parseHTML(json.postBody).document.title),

    // Synchronous transformation as Array.prototype.filter
    pipe(title => title.includes('important') ? title : null),

    // Asynchronous in 4 parallel
    pipe(async title => {
      const result = await storeInDB(title), 4);
      console.info(result);
    }, 4)

    (err) => console.info('All done!')
);

Types:

import { Transform, TransformOptions } from 'stream';

type ParallelTransformOpitons =
  | number
  | TransformOptions & { maxParallel?: number, ordered?: boolean };

export default function pipe(
    fn: (data: any) => Promise<any> | any,
    opts?: ParallelTransformOptions,
): Transform;
Option property Default value description
maxParallel 10 Number of maximum parallel executions.
ordered true Preserving order of streaming chunks.

A number can be passed to opts. pipe(fn, 20) is same as pipe(fn, {maxParallel: 20}).

Some utility functions

pipeline(stream, stream, ...)

A promisified version of require('stream').pipeline. Equivalent to:

const { promisify } = require('util');
const { pipeline: _pipeline } = require('stream');
const pipeline = promisify(_pipeline);

Example:

const { pipeline, pipe } = require('pipeline-pipe');

await pipeline(
    readable,
    pipe(chunk => chunk.replace('a', 'z')),
    pipe(chunk => storeInDB(chunk)),
);
console.log('All done!');

concat(size)

It concatenates sequential data to be specified size of array. This is useful when you post array data at once in the way that Elasticsearch Bulk API does.

Example:

const { pipeline } = require('stream');
const { concat, pipe } = require('pipeline-pipe');

pipeline(
    Readable.from([1, 2, 3, 4, 5]),
    concat(2),
    pipe(console.log),  // [ 1, 2 ]
                        // [ 3, 4 ]
                        // [ 5 ]
    (err) => console.info('All done!'),
);

split()

Creates a Transform to split incoming Array chunk into pieces to subsequent streams.

const { pipeline } = require('stream');
const { split, pipe } = require('pipeline-pipe');

pipeline(
    Readable.from([1, 2, 3]),
    pipe(page => getPostsByPage(page)),
    pipe(json => json.posts),               // Returns an array of posts
    split(),                                // Splits the array into each posts
    pipe(post => storeInDB(post.title)),    // Now the argument is a post
    (err) => console.info('All done!')
);

License

MIT