/stream-workflow

Primary LanguageJavaScriptMIT LicenseMIT

StreamFlow

Simplify your pipeline. Encapsulate your pipeline in a stream.

NPM Build Status Coverage Status NPM Download Dependencies Status

Replace

fs.createReadStream(`data.json`)
    .pipe(JSONStream.parse("*"))
    .pipe(new Transform({
        objectMode: true,
        transform(chunk, enc, cb) {
            cb(null, chunk);
        }
    }));

With

fs.createReadStream(`data.json`)
    .pipe(new ComplexStream());
class ComplexStream extends StreamFlow { {
    constructor({
        objectMode: true,
        init(stream) {            
            // Define your pipeline.
            // init function must return the last stream of your pipeline.  
            return pipeline(        
                stream,             // pathtrought stream
                JSONStream.parse(), // Transform 1 in diagram
                new Transform({     // Transform 2 in diagram
                    objectMode: true,
                    transform(chunk, enc, cb) {
                        cb(null, chunk);
                    }
                }),
                error => error && this.emit("error", error) // propagate error
            )
        }
    })
}

Classic pipeline

Transform stream

Stream workflow pipeline

StreamWorkflow

Test

To run our tests, clone the stream-workflow repo and install the dependencies.

$ git clone https://github.com/BenoitClaveau/stream-workflow --depth 1
$ cd stream-workflow
$ npm install
$ cd tests
$ node.exe "../node_modules/mocha/bin/mocha" .