Simplify your pipeline. Encapsulate your pipeline in a stream.
Replace
fs.createReadStream(`data.json`)
.pipe(JSONStream.parse("*"))
.pipe(new Transform({
objectMode: true,
transform(chunk, enc, cb) {
cb(null, chunk);
}
}));
With
fs.createReadStream(`data.json`)
.pipe(new ComplexStream());
class ComplexStream extends StreamFlow { {
constructor({
objectMode: true,
init(stream) {
// Define your pipeline.
// init function must return the last stream of your pipeline.
return pipeline(
stream, // pathtrought stream
JSONStream.parse(), // Transform 1 in diagram
new Transform({ // Transform 2 in diagram
objectMode: true,
transform(chunk, enc, cb) {
cb(null, chunk);
}
}),
error => error && this.emit("error", error) // propagate error
)
}
})
}
To run our tests, clone the stream-workflow repo and install the dependencies.
$ git clone https://github.com/BenoitClaveau/stream-workflow --depth 1
$ cd stream-workflow
$ npm install
$ cd tests
$ node.exe "../node_modules/mocha/bin/mocha" .