help with highland configurable consumer
jcdietrich opened this issue · 11 comments
I have thee following code with a file test.info
containing some numbers each on their own line. I expect this to output a series of lines Doubled =
then the doubled number. Instead I'm getting the error that follows. It is a little funky, I admit, but I am working with some legacy code and want to be able to conditionally swap in a highland based consumer, so can't switch it all out to highland. Any ideas? (Is there a better spot for questions like these? Slack? IRC?)
const _ = require('highland');
const fs = require('fs');
const Pump = require('util').promisify(require('stream').pipeline);
const double = x => x*2;
const consumer = _.curry(({ func, msg }, data) => _(data)
.map(func)
.each(x => console.log(msg, x))
.done(()=> console.log('done consuming...'))
);
const main = async () => {
const fileStream = fs.createReadStream('test.info');
const publisher = consumer({func: double, msg: 'Doubled = '});
await Pump(fileStream, publisher);
console.log('DONE');
};
main();
(node:42596) UnhandledPromiseRejectionWarning: TypeError: Cannot read property 'on' of undefined
at destroyer (internal/streams/pipeline.js:23:10)
at internal/streams/pipeline.js:78:12
at Array.map (<anonymous>)
at pipeline (internal/streams/pipeline.js:75:28)
at internal/util.js:278:30
at new Promise (<anonymous>)
at pipeline (internal/util.js:277:12)
at main (/Users/jdietrich/work/ipsis-standardcsv/src/test.js:18:11)
at Object.<anonymous> (/Users/jdietrich/work/ipsis-standardcsv/src/test.js:23:1)
at Module._compile (internal/modules/cjs/loader.js:959:30)
Can you please create a mini test repo I can use to reproduce it with a package.json? I'm not sure what stream library that is, https://www.npmjs.com/package/stream doesn't seem to have a pipeline function.
Found it! Pipeline is a method on the built-in Stream module: https://nodejs.org/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback
An isolated reproduction repo would still help a lot here though.
Thanks! That helped me a lot.
const consumer = _.curry(({ func, msg }, data) => _(data)
.split()
.map(func)
.tap(x => console.log(msg, x))
.toNodeStream()
);
- When supplying a function to pipeline, it needs to return a node JS stream
- readFileStream will read the whole chunks of text so we need to use highland's split to emit each line separately
- https://nodejs.org/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback
- Looking at the history drawer for that function, the pipeline function only supports functions in Node 13+
Even with those changes, I'm still getting the same error. :(
Ack! I should have diffed it to ensure I get all the changes:
const _ = require('highland');
const fs = require('fs');
const Pump = require('util').promisify(require('stream').pipeline);
const double = x => x*2;
const consumer = _.curry(({ func, msg }, data) => _(data)
.split()
.map(func)
.tap(x => console.log(msg, x))
.toNodeStream()
);
const main = async () => {
const fileStream = fs.createReadStream('test.info', { encoding: 'utf8' });
const publisher = consumer({ func: double, msg: 'Doubled = ' });
await Pump(fileStream, publisher);
console.log('DONE');
};
main();
- I also added
{ encoding: 'utf8' }
to the fs.createReadStream call
If you're still having issues I'll fork your repro repo and setup a CI
Still having issues.. :(
sorry.
Alright, I'll set up a CI and have it run against different versions. What happens if you try node 14?
node 14 works as expected. :(
The CI helped prove that solution will not work on versions before node 14.
https://github.com/eccentric-j/highland-pipline-repro/actions/runs/144114428
The most straight forward solution that works with all 3 node versions:
const _ = require('highland');
const fs = require('fs');
const Pump = require('util').promisify(require('stream').pipeline);
const double = x => x*2;
async function main () {
await fs.createReadStream('test.info', { encoding: 'utf8' })
.pipe(_())
.split()
.map(double)
.tap(x => console.log('Doubled = ', x))
.collect()
.toPromise(Promise)
console.log('DONE');
}
main()
.catch(err => {
console.error(err);
process.exit(1);
});
https://github.com/eccentric-j/highland-pipline-repro/actions/runs/144122089
Finally, there's a solution that works about the way you intended it:
const _ = require('highland');
const fs = require('fs');
const stream = require("stream");
const Pump = require('util').promisify(stream.pipeline);
const double = x => x*2;
const consumer = _.curry(({ func, msg }, data) => _(data)
.split()
.map(func)
.tap(x => console.log(msg, x))
);
function transformStream (streamFn, opts) {
const input = _();
const output = streamFn(input);
let subscribed = false;
const xfStream = new stream.Transform(Object.assign({
objectMode: true,
transform (chunk, encoding, callback) {
// we only want to create this subscription once
if (!subscribed) {
output
.errors((err, push) => {
// end the highland stream
push(_.nil);
// destroy outer transform stream with error
this.destroy(err);
})
.each(x => this.push(x))
subscribed = true;
}
input.write(chunk);
callback();
},
flush (callback) {
input.end();
}
}, opts))
return xfStream;
}
const main = async () => {
const fileStream = fs.createReadStream('test.info', { encoding: 'utf8' });
const publisher = consumer({ func: double, msg: 'Doubled = ' });
await Pump(fileStream, transformStream(publisher));
console.log('DONE');
};
main()
.catch(err => {
console.error(err);
process.exit(1);
});
Which does work on the last 3 node versions as well https://github.com/eccentric-j/highland-pipline-repro/actions/runs/144206922
Thank you!