RubenVerborgh/AsyncIterator

[Possible bug] Empty iterator does not work after `await`

jeswr opened this issue · 19 comments

jeswr commented

The empty iterator does not emit events if something has been awaited after its instantiation - for instance, the following tests time out

async function promisifyEventEmitter(eventEmitter: EventEmitter): Promise<void> {
  return new Promise<void>((resolve, reject) => {
    eventEmitter.on('end', resolve);
    eventEmitter.on('error', reject);
  });
}

it('empty test awaiting promise', async () => {
  const e = empty();
  await Promise.resolve();
  await expect(promisifyEventEmitter(e)).resolves.toBeFalsy();
});

it('empty test undefined', async () => {
  const e = empty();
  await undefined;
  await expect(promisifyEventEmitter(e)).resolves.toBeFalsy();
});

whilst the following works

it('empty test undefined', async () => {
  const e = empty();
  await expect(promisifyEventEmitter(e)).resolves.toBeFalsy();
});

Discovered in comunica/comunica#904

jeswr commented

I've added some more tests with the behavior I would expect here - but haven't yet had the time to find the root cause of the issue (and not sure if I will anytime soon unfortunately).

All of the tests on this branch pass npm run test:immediate

node: v16.10.0
npm: 7.24.0

I suspect you're running into an autoStart issue (see some of the closed issues here).
By default, asynciterators will start automatically after creation.
That means that if the iterator is empty, it will immediately end, so end-events may be emitted before listeners are attached.

There is an issue to change this behaviour: #25.

jeswr commented

So in doing working on #36 I've discovered that this issue is not entirely resolved by changing the autoStart to be false by default since the EmptyIterator does not actually have an autoStart parameter.

In particular the bottom two of these following tests fail

  describe('An EmptyIterator should not emit until read from', () => {
    it('no awaiting', async () => {
      const iterator = new EmptyIterator();
      await expect(await promisifyEventEmitter(iterator)).to.be.undefined;
    });

    it('awaiting undefined', async () => {
      const iterator = new EmptyIterator();
      await undefined;
      await expect(await promisifyEventEmitter(iterator)).to.be.undefined;
    });

    it('awaiting promise', async () => {
      const iterator = new EmptyIterator();
      await Promise.resolve();
      await expect(await promisifyEventEmitter(iterator)).to.be.undefined;
    });
  });
jeswr commented

I'm thinking there are 2 solutions here

  • (1) Add a waitForEndListener similar to the waitForDataListener
  • (2) Inside AsyncIterator add the following methods, and call _start whenever _sourceStarted is set to true. Though this could mean that the end event is still called early if there is awaiting between the addition of the data listener and the end listener
  protected _sourceStarted: boolean;
 protected taskBuffer: (() => {})[] = [];

 protected _start() {
   if (!this._sourceStarted) {
     this._sourceStarted = true;
     this.taskBuffer.forEach(task => taskScheduler(task))
     this.taskBuffer = [];
   }
 }

 protected _taskScheduler(task: () => {}) {
   (this._sourceStarted ? taskScheduler : this.taskBuffer.push)(task);
 }

I'm fine with either option.

@RubenVerborgh what option do you prefer?

I now consider the default autoStart behavior to be incorrect, given https://nodejs.org/api/stream.html#readable-streams:

All Readable streams begin in paused mode but can be switched to flowing mode in one of the following ways:
Adding a 'data' event handler.

Hence, we must not emit an end (or data) event before a data listener is attached.

The autoStart functionality (as can be seen in the comments) was initially actually meant to indicate whether or not a stream should start buffering already. However, the above end mistake was made, resulting in empty streams created with autoStart: true (the default) to have the unexpected behavior that they would close faster than you can catch.

Hence it is also nonsensical to have autoStart on the ArrayIterator. Because of the above confusion, autoStart became synonymous with automatically ending empty streams, and hence also applied to ArrayIterator (and several others, where it wasn't even implemented).

So what I think @rubensworks wants is not for streams to not buffer, but for empty streams to (oh irony) not auto end.

This might shed a very different light on #36.


@rubensworks and @jeswr, would everything work as expected for you if:

  1. the end event cannot be fired before a data listener is attached
  2. autoStart is replaced by prebuffer and only applies to the BufferedIterator and its derived classes
jeswr commented

That sounds like a good solution with expected outcomes @RubenVerborgh - I'll let you know if I have any more thoughts in modifying #36.

This should only introduce a few breaks in the comunica 2.x - again related to on.('metadata', ...) use @rubensworks

I think Comunica needs to get the stream going by attaching an on('data') itself. We could also (like Node.js) introduce a .resume method (which I would implement basically by adding a .on('data', noop)).

So what I think @rubensworks wants is not for streams to not buffer, but for empty streams to (oh irony) not auto end.

Exactly. Buffering itself is no problem.

I think Comunica needs to get the stream going by attaching an on('data') itself. We could also (like Node.js) introduce a .resume method (which I would implement basically by adding a .on('data', noop)).

I think this would be very valuable!

@jeswr Would you be up for the challenge? It would replace #36. (Sorry about that; but it's your work there that gave me the insights!)

jeswr commented

@RubenVerborgh I made a start on it yesterday and will be try and do the rest today

Amazing; really grateful to have you on this.

jeswr commented

Still working on this, I just wanted to check I'm on track with some of the decisions I have made in terms of other slight changes in behaviour as a result of this (I've done my best to keep it in line with the Readable API behaviour).

  • .readable is true when data is available. This means that the 'end' event can be emitted after .readable is false.

  • .ended becomes true directly after the end event has been emitted (I've introduced an ENDING state to handle the period in which we wait to emit the end event)

  • the iterator does not perform its ENDED/DESTROYED state clean up until the end (or error) event is emitted. This means that if the iterator has has all its data retrieved by calling .read() directly then the iterator will not perform that clean up until after a data listener is subscribed or .resume() is called

  • .readable is true when data is available.

…when data might be available. So I think even the EmptyIterator would start with readable: true now; but the moment you attach a data listener, it never fires. Only end.

This means that the 'end' event can be emitted after .readable is false.

Correct.

  • .ended becomes true directly after the end event has been emitted

Likely, but I don't think it needs to be this strong.

Currently, it is allowed for an iterator to have ended: true before all listeners have received the end event;
however, this only happens in EmptyIterator, and that case will be removed.
So I don't think we will need that state.
Even more, it seems like we can eliminate the eventAsync parameter from _changeState.

Are you introducing it for the case where an iterator has no items left, but no data listener is attached?
I would do this, for example, by just calling _end from the read method.

So summarizing: I do think your statement is true / can be made true.

(I've introduced an ENDING state to handle the period in which we wait to emit the end event)

So I'm not sure if there is a necessity for that, but happy to hear arguments.

  • the iterator does not perform its ENDED/DESTROYED state clean up until the end (or error) event is emitted.

Why is that? This is internal state; the iterator can do as it wishes.

jeswr commented

TLDR; I agree with all the feedback; I have a lot more clarity on how to finish this off.

Are you introducing it for the case where an iterator has no items left, but no data listener is attached?

Yes

Even more, it seems like we can eliminate the eventAsync parameter from _changeState.

Yes I've done this, it only broke on test in the EmptyIterator unit tests and that can be deleted anyway

I would do this, for example, by just calling _end from the read method.

Good thought - I think this is cleaner than the direction I was starting to head with the ENDING state.

(I've introduced an ENDING state to handle the period in which we wait to emit the end event)

I was making modifications so that in the _end call the this.emit('end') was delayed if there were no data listeners at that point in time; using a set of functions similar to onData and waitForDataListener. This may be made redundant by your above suggestion; our could possibly be even left in the CLOSED state until this.emit('end') is called

Why is that? This is internal state; the iterator can do as it wishes.

Sorry - I wasn't too clear here; I was referring to this; which was previously called when changing the state to DESTROYED or ENDED

this._readable = false;
this.removeAllListeners('readable');
this.removeAllListeners('data');
this.removeAllListeners('end');

I was making modifications so that in the _end call the this.emit('end') was delayed if there were no data listeners at that point in time

I found that

For backward compatibility reasons, removing 'data' event handlers will not automatically pause the stream.

https://nodejs.org/api/stream.html

so I think we can just do that same.

This may be made redundant by your above suggestion; our could possibly be even left in the CLOSED state until this.emit('end') is called

Yes, let's do that!

Sorry - I wasn't too clear here; I was referring to this; which was previously called when changing the state to DESTROYED or ENDED

That's correct; those can only happen after emitting the final end event.

jeswr commented

@RubenVerborgh Apologies for dragging this out - another places that I just realised the original implementation was going against the Readable spec is the following:

Calling readable.pause(), readable.unpipe(), or receiving backpressure will cause the readable.readableFlowing to be set as false, temporarily halting the flowing of events but not halting the generation of data. While in this state, attaching a listener for the 'data' event will not switch readable.readableFlowing to true.

Similar to the changes around "For backward compatibility reasons, removing 'data' event handlers will not automatically pause the stream." this breaks a more tests around de re-attaching data listeners (though for the most part this could be resolved by replacing those re-attachments with calling .resume.

Should I go ahead and make this change to align with the Readable spec

@rubensworks Do you imagine this will cause many problems downstream in Comunica (I shouldn't think so since it didn't have access to .pause or .readableFlowing in the original version)?

@rubensworks Do you imagine this will cause many problems downstream in Comunica (I shouldn't think so since it didn't have access to .pause or .readableFlowing in the original version)?

Indeed, shouldn't be a problem for Comunica.

I just realised the original implementation was going against the Readable spec is the following:

Calling readable.pause(), readable.unpipe(), or receiving backpressure will cause the readable.readableFlowing to be set as false, temporarily halting the flowing of events but not halting the generation of data. While in this state, attaching a listener for the 'data' event will not switch readable.readableFlowing to true.

But the original implementation did not do pause, right?

this breaks a more tests around de re-attaching data listeners (though for the most part this could be resolved by replacing those re-attachments with calling .resume.

Should I go ahead and make this change to align with the Readable spec

Fine with me. The boatload of tests was more to ensure consistent behavior wrt race conditions; what exactly the behavior is, can be determined for every major version—as long as it is self-consistent.