nodejs/node

Async Iterable `EventEmitter.on(emitter, "event")`

benjamingr opened this issue ยท 13 comments

Continuing the discussion from the once PR. Regarding the discussion of EventEmitter.on(emitter, "eventName")

events.on(emitter, name)

  • emitter <EventEmitter>
  • name <string>
  • Returns: <AsyncIterable>

Creates an AsyncIterable that yields promises for the values of the given event name in the given EventEmitter. The iterable rejects the next promise when the EventEmitter emits 'error'. Calling .return on the iterable removes the subscription which means the iterable unsubscribes automatically when a for await loop is broken or returned from.

Here is an example of the API:

const events = on(emitter, "item");
events.next(); // Promise for the next event value 
events.return(); // unsubscribe
events.throw(); // not sure what we want this to do, we might want to emit `error` on 

Or with for await syntax:

const foos = on(emitter, 'foo');
for await (const foo of foos) {
  foo; // the next value from the emitter
  break; // this calls `.return` on the async iterable and unsubscribes.
}

cc @mcollina @jasnell @MylesBorins @addaleax @BridgeAR

Addressing @jasnell 's points from that PR:

Ok, quick investigation... it should be possible to implement an EventEmitter.on(emitter, event) function that is essentially a simplification of the code used for stream.Readable with a few notable differences:

Unlike stream.Readable, there is no natural end to the event stream, and there is no destroy() method, so cleanup and termination of the for await() {} would be entirely dependent on using break and return as suggested.

This is true with a for await loop. It is also possible to call .return on the async iterable too close it.

With stream.Readable, when there are multiple synchronous push()'s, there's still just a single 'readable', and just in case there are multiple read() operations performed. For an arbitrary EventEmitter, we would need a way of reliably handling synchronous emits.

True, and this is by design for the most part. Async iterator libraries like axax have utilities like take which make this easy.

My main concern about this API is that EE does not support backpressure. Essentially the following:

const foos = on(emitter, 'foo');
for await (const foo of foos) {
  await doSomethingRemotely()
}

Will have all events queue up and processed one at a time. Are we ok with it? I'm struggling to make up my mind. I think some safety measure should be built in (maybe with an option), like the following:

const foos = on(emitter, 'foo', { maxQueue: 1000 });
// will throw if the number of accumulated events waiting to be processed are
// over 1000.
for await (const foo of foos) {
  await doSomethingRemotely()
}

What do you think?


On another note, we need to make sure the following (or similar) works:

const foos = on(emitter, 'foo');
emitter.on('end', foos.removeListener.bind(foos)); // or similar
for await (const foo of foos) {
  await doSomethingRemotely()
}
// the loop will terminate when 'end' is emitted

Will have all events queue up and processed one at a time. Are we ok with it?

I think we are - events that are push can't do backpressure - that's why we have readable streams. We can and should be explicit about this in the documentation. I don't think we have to provide maxQueue because I definitely see use cases where we'd be fine with a queue of a few thousand items. Not feeling strongly about this though and ready to be convinced.

On another note, we need to make sure the following (or similar) works:

Oh interesting, can a listener detect it being removed? Are we sure we actually have to support this?

we can make it work by having removeListener clear the queue and putting a done: true into the queue of the async iterable

we can make it work by having removeListener clear the queue and putting a done: true into the queue of the async iterable

Would it be an issue if the emitter was been removeListener or removeAllListener? The iterator would hang unawarely in the case.

Would it be an issue if the emitter was been removeListener or removeAllListener? The iterator would hang unawarely in the case.

Yes, but that would happen even with a normal event handler. I don't think it's going to be a problem at all.

A problem that I handled with EventEmitter.once() is variadic arguments of emit(). The actual signature is going to look like the following:

const foos = on(emitter, 'foo');
process.nextTick(() => {
  emitter.emit('foo', 'a', 'b')
})
for await (const [first, second] of foos) {
}

As a more friendly syntax, we could support this case...

const foos = on(emitter, 'foo');
emitter.on('end', foos.removeListener.bind(foos)); // or similar
for await (const foo of foos) {
  await doSomethingRemotely()
}

... By declaring terminal events when the async iterator is created...

const foos = on(emitter, 'foo', { until: 'end' });
for await (const foo of foos) {
  await doSomethingRemotely()
}

We also need to consider what happens in the case...

const foos = on(emitter, 'foo');
emitter.removeAllListeners('foo')
for await (const foo of foos) {}

The async iterator needs to be able to clean itself up if the listener it registered has been removed.

In case it's of interest, something very similar is already possible with IxJS:

https://github.com/ReactiveX/IxJS/blob/master/src/asynciterable/fromevent.ts#L31

Once the emitter is wrapped with Ix, it can be used with for await...of or Ix's own chain-able operators, depending on one's needs and preferences.

Matteo already has an implementation :D

This idea seems good in isolation, but would it be worth seeing whether Emitter would be a good return value for on before landing this API?