caolan/highland

Handling errors inside stream of streams

itayadler opened this issue · 2 comments

Hey guys,

I'm trying to figure out how to properly catch errors that are emitted from a stream that I return from a flatMap, so the new stream the flatMap returns, will retrieve the error.
How would one do this?

Here's a code example show casing the issue I'm facing, which is I never reach the .errors handler with the error I emitted from the inside stream.

        var nodeStreamInsideStreams = _([1]);
        setTimeout(function(){
            nodeStreamInsideStreams.emit('error', new Error('Error inside stream of streams'));
        }, 1);
        _([10])
        .flatMap(function(value) {
            return _(nodeStreamInsideStreams);
        })
        .errors(function(err){
            console.log('we dont reach this place :()');
        })

Thanks

vqvu commented

Generally, you can catch errors from flatMap the way you would expect. It's just that your example makes a couple of wrong assumptions.

First, Highland streams aren't the same as Node streams, so nodeStreamInsideStreams is misnamed. Furthermore, while the emit method exists on Highland streams, you practically never want to use it, since it doesn't play well with the rest of the API. You probably wanted a real node stream.

const nodeStreamInsideStreams = new stream.Transform({
  transform(data, encoding, callback) {
    callback(null, data);
  }
});

Second, Highland streams are lazy. Nothing happens until you consume them by calling one of the Consumption methods. In your example, you didn't consume the Highland stream that you created, so your handler would never have run. Here, you can just add each to consume the stream.

_([10])
    .flatMap((value) => _(nodeStreamInsideStreams))
    .errors((err) => console.log('We should see this.', err))
    .each((x) => console.log('Value: ', x));

The following should work

const stream = require('stream');
const _ = require('highland');

const nodeStreamInsideStreams = new stream.Transform({
  transform(data, encoding, callback) {
    callback(null, data);
  }
});

setTimeout(function(){
    nodeStreamInsideStreams.emit('error', new Error('Error inside stream of streams'));
}, 1);

_([10])
    .flatMap((value) => _(nodeStreamInsideStreams))
    .errors((err) => console.log('We should see this now.', err))
    .each((x) => console.log('Value: ', x));

That said, when interoping with Node streams, you want to wrap them in Highland streams as soon as possible (usually when the Node stream is created), since you don't know when an event may come through. Wrapping them in a Highland stream will make sure that the correct listeners are registered before any events are sent; otherwise, you risk losing data.

Furthermore, since Highland streams are lazy, you typically want to construct thing exactly when you need them and not before. In this case, that means you should construct the node stream inside the flatMap function and not outside.

So in actual code, you'll want something more like this

_([10])
    .flatMap((value) => {
      const nodeStreamInsideStreams = new stream.Transform({
        transform(data, encoding, callback) {
          callback(null, data);
        }
      });

      setTimeout(function(){
        nodeStreamInsideStreams.emit('error', new Error('Error inside stream of streams'));
      }, 1);

      return _(nodeStreamInsideStreams);
    })
    .errors((err) => console.log('We should see this now.', err))
    .each((x) => console.log('Value: ', x));

Thanks for the great reply.

  1. Oops I forgot in the example to thunk :)
  2. Yeah I changed it from a node stream to highland since I had the same issue in both cases, but I think that's because I didn't thunk.
  3. I found a solution with your help:
const nodeStreamInsideStreams = _(streamify([1])).through(through())

This seems to propagate the error properly.