caolan/highland

help with highland configurable consumer

jcdietrich opened this issue · 11 comments

I have thee following code with a file test.info containing some numbers each on their own line. I expect this to output a series of lines Doubled = then the doubled number. Instead I'm getting the error that follows. It is a little funky, I admit, but I am working with some legacy code and want to be able to conditionally swap in a highland based consumer, so can't switch it all out to highland. Any ideas? (Is there a better spot for questions like these? Slack? IRC?)

const _ = require('highland');
const fs = require('fs');
const Pump = require('util').promisify(require('stream').pipeline);

const double = x => x*2;

const consumer = _.curry(({ func, msg }, data) => _(data)
    .map(func)
    .each(x => console.log(msg, x))
    .done(()=> console.log('done consuming...'))
);

const main = async () => {
    const fileStream = fs.createReadStream('test.info');
    const publisher = consumer({func: double, msg: 'Doubled = '});

    await Pump(fileStream, publisher);

    console.log('DONE');
};

main();
(node:42596) UnhandledPromiseRejectionWarning: TypeError: Cannot read property 'on' of undefined
    at destroyer (internal/streams/pipeline.js:23:10)
    at internal/streams/pipeline.js:78:12
    at Array.map (<anonymous>)
    at pipeline (internal/streams/pipeline.js:75:28)
    at internal/util.js:278:30
    at new Promise (<anonymous>)
    at pipeline (internal/util.js:277:12)
    at main (/Users/jdietrich/work/ipsis-standardcsv/src/test.js:18:11)
    at Object.<anonymous> (/Users/jdietrich/work/ipsis-standardcsv/src/test.js:23:1)
    at Module._compile (internal/modules/cjs/loader.js:959:30)

Can you please create a mini test repo I can use to reproduce it with a package.json? I'm not sure what stream library that is, https://www.npmjs.com/package/stream doesn't seem to have a pipeline function.

Found it! Pipeline is a method on the built-in Stream module: https://nodejs.org/api/stream.html#stream_stream_pipeline_source_transforms_destination_callback

An isolated reproduction repo would still help a lot here though.

Thanks! That helped me a lot.

const consumer = _.curry(({ func, msg }, data) => _(data)
    .split()
    .map(func)
    .tap(x => console.log(msg, x))
    .toNodeStream()
);

Even with those changes, I'm still getting the same error. :(

Ack! I should have diffed it to ensure I get all the changes:

const _ = require('highland');
const fs = require('fs');
const Pump = require('util').promisify(require('stream').pipeline);

const double = x => x*2;

const consumer = _.curry(({ func, msg }, data) => _(data)
    .split()
    .map(func)
    .tap(x => console.log(msg, x))
    .toNodeStream()
);

const main = async () => {
    const fileStream = fs.createReadStream('test.info', { encoding:  'utf8' });
    const publisher = consumer({ func: double, msg: 'Doubled = ' });

    await Pump(fileStream, publisher);

    console.log('DONE');
};

main();
  • I also added { encoding: 'utf8' } to the fs.createReadStream call

If you're still having issues I'll fork your repro repo and setup a CI

Still having issues.. :(
sorry.

Alright, I'll set up a CI and have it run against different versions. What happens if you try node 14?

node 14 works as expected. :(

The CI helped prove that solution will not work on versions before node 14.

https://github.com/eccentric-j/highland-pipline-repro/actions/runs/144114428

The most straight forward solution that works with all 3 node versions:

const _ = require('highland');
const fs = require('fs');
const Pump = require('util').promisify(require('stream').pipeline);

const double = x => x*2;

async function main () {
    await fs.createReadStream('test.info', { encoding: 'utf8' })
        .pipe(_())
        .split()
        .map(double)
        .tap(x => console.log('Doubled = ', x))
        .collect()
        .toPromise(Promise)

    console.log('DONE');
}

main()
    .catch(err => {
        console.error(err);
        process.exit(1);
    });

https://github.com/eccentric-j/highland-pipline-repro/actions/runs/144122089

Finally, there's a solution that works about the way you intended it:

const _ = require('highland');
const fs = require('fs');
const stream = require("stream");
const Pump = require('util').promisify(stream.pipeline);

const double = x => x*2;

const consumer = _.curry(({ func, msg }, data) => _(data)
    .split()
    .map(func)
    .tap(x => console.log(msg, x))
);

function transformStream (streamFn, opts) {
    const input = _();
    const output = streamFn(input);
    let subscribed = false;

    const xfStream = new stream.Transform(Object.assign({
        objectMode: true,
        transform (chunk, encoding, callback) {
            // we only want to create this subscription once
            if (!subscribed) {
                output
                  .errors((err, push) => {
                      // end the highland stream
                      push(_.nil);
                      // destroy outer transform stream with error
                      this.destroy(err);
                  })
                  .each(x => this.push(x))
                subscribed = true;
            }
            input.write(chunk);
            callback();
        },

        flush (callback) {
            input.end();
        }
    }, opts))

    return xfStream;
}

const main = async () => {
    const fileStream = fs.createReadStream('test.info', { encoding:  'utf8' });
    const publisher = consumer({ func: double, msg: 'Doubled = ' });

    await Pump(fileStream, transformStream(publisher));

    console.log('DONE');
};


main()
    .catch(err => {
        console.error(err);
        process.exit(1);
    });

Which does work on the last 3 node versions as well https://github.com/eccentric-j/highland-pipline-repro/actions/runs/144206922

Thank you!