[Possible bug] Empty iterator does not work after `await`
jeswr opened this issue · 19 comments
The empty
iterator does not emit events if something has been await
ed after its instantiation - for instance, the following tests time out
async function promisifyEventEmitter(eventEmitter: EventEmitter): Promise<void> {
return new Promise<void>((resolve, reject) => {
eventEmitter.on('end', resolve);
eventEmitter.on('error', reject);
});
}
it('empty test awaiting promise', async () => {
const e = empty();
await Promise.resolve();
await expect(promisifyEventEmitter(e)).resolves.toBeFalsy();
});
it('empty test undefined', async () => {
const e = empty();
await undefined;
await expect(promisifyEventEmitter(e)).resolves.toBeFalsy();
});
whilst the following works
it('empty test undefined', async () => {
const e = empty();
await expect(promisifyEventEmitter(e)).resolves.toBeFalsy();
});
Discovered in comunica/comunica#904
I've added some more tests with the behavior I would expect here - but haven't yet had the time to find the root cause of the issue (and not sure if I will anytime soon unfortunately).
All of the tests on this branch pass npm run test:immediate
node: v16.10.0
npm: 7.24.0
I suspect you're running into an autoStart issue (see some of the closed issues here).
By default, asynciterators will start automatically after creation.
That means that if the iterator is empty, it will immediately end, so end-events may be emitted before listeners are attached.
There is an issue to change this behaviour: #25.
So in doing working on #36 I've discovered that this issue is not entirely resolved by changing the autoStart
to be false by default since the EmptyIterator
does not actually have an autoStart
parameter.
In particular the bottom two of these following tests fail
describe('An EmptyIterator should not emit until read from', () => {
it('no awaiting', async () => {
const iterator = new EmptyIterator();
await expect(await promisifyEventEmitter(iterator)).to.be.undefined;
});
it('awaiting undefined', async () => {
const iterator = new EmptyIterator();
await undefined;
await expect(await promisifyEventEmitter(iterator)).to.be.undefined;
});
it('awaiting promise', async () => {
const iterator = new EmptyIterator();
await Promise.resolve();
await expect(await promisifyEventEmitter(iterator)).to.be.undefined;
});
});
I'm thinking there are 2 solutions here
- (1) Add a waitForEndListener similar to the
waitForDataListener
- (2) Inside
AsyncIterator
add the following methods, and call_start
whenever_sourceStarted
is set to true. Though this could mean that the end event is still called early if there isawaiting
between the addition of thedata
listener and theend
listener
protected _sourceStarted: boolean;
protected taskBuffer: (() => {})[] = [];
protected _start() {
if (!this._sourceStarted) {
this._sourceStarted = true;
this.taskBuffer.forEach(task => taskScheduler(task))
this.taskBuffer = [];
}
}
protected _taskScheduler(task: () => {}) {
(this._sourceStarted ? taskScheduler : this.taskBuffer.push)(task);
}
I'm fine with either option.
@RubenVerborgh what option do you prefer?
I now consider the default autoStart
behavior to be incorrect, given https://nodejs.org/api/stream.html#readable-streams:
All Readable streams begin in paused mode but can be switched to flowing mode in one of the following ways:
Adding a 'data' event handler.
Hence, we must not emit an end
(or data
) event before a data
listener is attached.
The autoStart
functionality (as can be seen in the comments) was initially actually meant to indicate whether or not a stream should start buffering already. However, the above end
mistake was made, resulting in empty streams created with autoStart: true
(the default) to have the unexpected behavior that they would close faster than you can catch.
Hence it is also nonsensical to have autoStart
on the ArrayIterator
. Because of the above confusion, autoStart
became synonymous with automatically ending empty streams, and hence also applied to ArrayIterator
(and several others, where it wasn't even implemented).
So what I think @rubensworks wants is not for streams to not buffer, but for empty streams to (oh irony) not auto end.
This might shed a very different light on #36.
@rubensworks and @jeswr, would everything work as expected for you if:
- the
end
event cannot be fired before adata
listener is attached autoStart
is replaced byprebuffer
and only applies to theBufferedIterator
and its derived classes
That sounds like a good solution with expected outcomes @RubenVerborgh - I'll let you know if I have any more thoughts in modifying #36.
This should only introduce a few breaks in the comunica 2.x - again related to on.('metadata', ...)
use @rubensworks
I think Comunica needs to get the stream going by attaching an on('data')
itself. We could also (like Node.js) introduce a .resume
method (which I would implement basically by adding a .on('data', noop)
).
So what I think @rubensworks wants is not for streams to not buffer, but for empty streams to (oh irony) not auto end.
Exactly. Buffering itself is no problem.
I think Comunica needs to get the stream going by attaching an on('data') itself. We could also (like Node.js) introduce a .resume method (which I would implement basically by adding a .on('data', noop)).
I think this would be very valuable!
@RubenVerborgh I made a start on it yesterday and will be try and do the rest today
Amazing; really grateful to have you on this.
Still working on this, I just wanted to check I'm on track with some of the decisions I have made in terms of other slight changes in behaviour as a result of this (I've done my best to keep it in line with the Readable API behaviour).
-
.readable
is true when data is available. This means that the 'end' event can be emitted after .readable is false. -
.ended
becomes true directly after the end event has been emitted (I've introduced an ENDING state to handle the period in which we wait to emit the end event) -
the iterator does not perform its ENDED/DESTROYED state clean up until the end (or error) event is emitted. This means that if the iterator has has all its data retrieved by calling
.read()
directly then the iterator will not perform that clean up until after a data listener is subscribed or.resume()
is called
.readable
is true when data is available.
…when data might be available. So I think even the EmptyIterator
would start with readable: true
now; but the moment you attach a data
listener, it never fires. Only end
.
This means that the 'end' event can be emitted after .readable is false.
Correct.
.ended
becomes true directly after the end event has been emitted
Likely, but I don't think it needs to be this strong.
Currently, it is allowed for an iterator to have ended: true
before all listeners have received the end
event;
however, this only happens in EmptyIterator
, and that case will be removed.
So I don't think we will need that state.
Even more, it seems like we can eliminate the eventAsync
parameter from _changeState
.
Are you introducing it for the case where an iterator has no items left, but no data listener is attached?
I would do this, for example, by just calling _end
from the read
method.
So summarizing: I do think your statement is true / can be made true.
(I've introduced an ENDING state to handle the period in which we wait to emit the end event)
So I'm not sure if there is a necessity for that, but happy to hear arguments.
- the iterator does not perform its ENDED/DESTROYED state clean up until the end (or error) event is emitted.
Why is that? This is internal state; the iterator can do as it wishes.
TLDR; I agree with all the feedback; I have a lot more clarity on how to finish this off.
Are you introducing it for the case where an iterator has no items left, but no data listener is attached?
Yes
Even more, it seems like we can eliminate the
eventAsync
parameter from_changeState
.
Yes I've done this, it only broke on test in the EmptyIterator
unit tests and that can be deleted anyway
I would do this, for example, by just calling _end from the read method.
Good thought - I think this is cleaner than the direction I was starting to head with the ENDING
state.
(I've introduced an ENDING state to handle the period in which we wait to emit the end event)
I was making modifications so that in the _end
call the this.emit('end')
was delayed if there were no data listeners at that point in time; using a set of functions similar to onData
and waitForDataListener
. This may be made redundant by your above suggestion; our could possibly be even left in the CLOSED
state until this.emit('end')
is called
Why is that? This is internal state; the iterator can do as it wishes.
Sorry - I wasn't too clear here; I was referring to this; which was previously called when changing the state to DESTROYED
or ENDED
this._readable = false;
this.removeAllListeners('readable');
this.removeAllListeners('data');
this.removeAllListeners('end');
I was making modifications so that in the
_end
call thethis.emit('end')
was delayed if there were no data listeners at that point in time
I found that
For backward compatibility reasons, removing 'data' event handlers will not automatically pause the stream.
—https://nodejs.org/api/stream.html
so I think we can just do that same.
This may be made redundant by your above suggestion; our could possibly be even left in the
CLOSED
state untilthis.emit('end')
is called
Yes, let's do that!
Sorry - I wasn't too clear here; I was referring to this; which was previously called when changing the state to
DESTROYED
orENDED
That's correct; those can only happen after emitting the final end
event.
@RubenVerborgh Apologies for dragging this out - another places that I just realised the original implementation was going against the Readable
spec is the following:
Calling readable.pause(), readable.unpipe(), or receiving backpressure will cause the readable.readableFlowing to be set as false, temporarily halting the flowing of events but not halting the generation of data. While in this state, attaching a listener for the 'data' event will not switch readable.readableFlowing to true.
Similar to the changes around "For backward compatibility reasons, removing 'data' event handlers will not automatically pause the stream." this breaks a more tests around de re-attaching data listeners (though for the most part this could be resolved by replacing those re-attachments with calling .resume
.
Should I go ahead and make this change to align with the Readable
spec
@rubensworks Do you imagine this will cause many problems downstream in Comunica (I shouldn't think so since it didn't have access to .pause
or .readableFlowing
in the original version)?
@rubensworks Do you imagine this will cause many problems downstream in Comunica (I shouldn't think so since it didn't have access to .pause or .readableFlowing in the original version)?
Indeed, shouldn't be a problem for Comunica.
I just realised the original implementation was going against the
Readable
spec is the following:Calling readable.pause(), readable.unpipe(), or receiving backpressure will cause the readable.readableFlowing to be set as false, temporarily halting the flowing of events but not halting the generation of data. While in this state, attaching a listener for the 'data' event will not switch readable.readableFlowing to true.
But the original implementation did not do pause
, right?
this breaks a more tests around de re-attaching data listeners (though for the most part this could be resolved by replacing those re-attachments with calling
.resume
.Should I go ahead and make this change to align with the
Readable
spec
Fine with me. The boatload of tests was more to ensure consistent behavior wrt race conditions; what exactly the behavior is, can be determined for every major version—as long as it is self-consistent.