RubenVerborgh/AsyncIterator

How to .pipe() into array?

Closed this issue · 6 comments

When using AsyncIterator as a lighter alternative to streams within a project's internals, how should I approach returning something that can be .pipe()-ed into a stream.Writable? I've tried wrapping iterators via a custom extension of stream.Readable but I wonder if there's a more elegant way...

There's no piping on purpose, because piping comes with two-way congestion control, whereas AsyncIterator is pull-only. (This is how we get the performance gains.)

If your consumer is fast enough, you can always simply src.on('data', d => dest.write(d)), but I assume this is what you want to avoid?

Oh yes, I understand where the performance gains come from and that is exactly why I'd rather use AsyncIterator instead of streams internally. However, externally, compatibility with streams often implies supporting .pipe() and I am trying to find the best way to implement that.

Short of coming up with an iterator-specific implementation of .pipe(), .unpipe() and whatever else stream.Readable comes with - which would probably cancel out the performance gains - the only option I could come up with is wrapping the final iterator in a chain of iterators using something like the following:

class IteratorStream extends stream.Readable {
  constructor(iterator) {
    super({objectMode: true});
    let reading = false;
    const loop = (() => {
      let item = iterator.read();
      if (item === null) {
        iterator.once('readable', loop);
        return;
      } 
      if (!this.push(item)) {
        reading = false;
        return;
      }
      loop();
    });
    this._read = (() => {
      if (!reading) {
        reading = true;
        loop();
      }
    });
    iterator.once('end', () => {
      this.push(null);
    });
  }
}

Is there a better way to approach this?

The general approach is definitely what you need.

I've looked at your implementation for a while, and I can't poke any holes into it. I like the reading guard you've put in.

Another way to achieve the same thing would be to attach a data listener whenever _read is called, and detach that listener whenever this.push returns false. That would save you from implementing the readable logic (which the data event does for you).

Should this be something I should consider adding to the library?

Also CC'ing @rubensworks, who might have needed a similar implementation in the past.

I did think about suggesting that as it would complement the .wrap() method that is already in there. However, this would create a hard dependency on either the native stream module or readable-stream, which would make browser bundles balloon in size quite a bit.

I appreciate you having a look at that. I also understand your suggestion but can't quite tell which of the two would be the better option. I'll work out a few tests to compare them. Thank you for taking the time to chime in! Feel free to close this whenever.

I've managed to use the on-data approach any time I needed to pipe into a Writable. But I agree, a pipe-like functionality, or an .unwrap() would be nice to have.

Regarding the browser bundle size issue, this functionality could be part of a standalone package.

this would create a hard dependency on either the native stream module or readable-stream, which would make browser bundles balloon in size quite a bit.

Right, thanks for reminding me of the reason why I haven't done that.

I also understand your suggestion but can't quite tell which of the two would be the better option.

Typically the one that reuses most code, since asynciterator is heavily tested. One of the doubts with your implementation might be the loop, which could block everything else on the thread.

Quick and untested stab at the code:

class IteratorStream extends stream.Readable {
  constructor (iterator) {
    super({ objectMode: true });
    if (iterator.ended)
      this.push(null);
    else {
      const push = data => this.push(data) || iterator.removeListener('data', push);
      this._read = () => iterator._addSingleListener('data', push);
      iterator.on('end', () => this.push(null));
    }
  }
}