ReactiveX/rxjs

Implement Symbol.asyncIterator on Observable

benlesh opened this issue · 3 comments

Discussed in #6779

Originally posted by benlesh January 21, 2022
Thinking about the backpressure-related use cases for interop between async iterables and observable, I think I'd consider it an improvement to ergonomics to implement Symbol.asyncIterator on Observable. See this comment here about handling backpressure. There are some really cool/easy/clever things that can be done with this functionality.

Here's a few things to consider:

  1. rxjs-for-await has a good amount of usage already.
  2. concatMap has exactly the same issue where users need to "understand there is buffering" in some cases, and so far, I haven't seen many people trip over that. In fact, many tutorials and documents steer people towards concatMap for this buffered, one-at-a-time behavior more often than not.
  3. for await isn't going away any time soon, and — other than callbacks — is the only real native way to iterate async values (one-at-a-time like concatMap, of course).
  4. Subscribing to an observable with for await is obviously non-cancellable, as there's no subscription or even an opportunity to pass a signal or the like, so it's unlikely to "replace" calling subscribe in the hearts and minds of users.
  5. Provides even better interop with IxJS and JavaScript in general.

For those new to this, here is what is being proposed (roughly):

for await (const value of someObservable$) {
   await sleep(1000);
   const subValue = await getAnotherValue(value);
   doSomething(subValue);
}

Which would roughly map 1-to-1 with this RxJS behavior:

someObservable$.pipe(
  concatMap(async (value) => {
    await sleep(1000);
    const subValue = await getAnotherValue(value);
    doSomething(subValue);
  })
)
.subscribe();

cc @cartant @kwonoj

Core Team: silently approved for 8.x

Subscribing to an observable with for await is obviously non-cancellable, as there's no subscription or even an opportunity to pass a signal or the like, so it's unlikely to "replace" calling subscribe in the hearts and minds of users.

In a for await loop, the return and throw functions are called on the AsyncIterator when exiting the for loop early (either, using return, break for the former and if an exception is thrown for the latter).

And if we want it to be cancellable from outside the for loop, we could somewhat easily stick a takeUntil on the observable.

This could even be all wrapped up neatly with (rough proof of concept):

export interface CancelToken
{
   cancel();
   get isCancelled();
}

export type Cancellable<T> = Observable<T> & CancelToken;

export function cancellable(source: Observable<T>): Cancellable<T>
{
   const cancelSubject = new BehaviorSubject(false);

   const canceller$ = cancelSubject.pipe(filter(cancelled => cancelled));

   cancellable = source.pipe(
      takeUntil(canceller$),
      tap({ finalize: () => cancelSubject.complete() })
   );

   cancellable.cancel = () => cancelSubject.next(true);
   cancellable.defineProperty('isCancelled', { get: () => cancelSubject.value });

  return cancellable;
}

The cancellable (as a CancelToken) can be provided within the loop for more advanced scenarios.
This is inspired by https://gist.github.com/andrewcourtice/ef1b8f14935b409cfe94901558ba5594
I tried to get something that could integrate well with https://github.com/tc39/proposal-cancellation or #6442 when these move forward. (Although, I guess it is part of these issue and might be a minefield we don't want to engage with at the moment).

My point is, while it's not currently natively cancellable, there are already ways to work around that right now, I guess...

I believe this will make code confusing, especially for those who never used an Observable as an iterable.

Consider the following:

for await (const a of $myObservable) {
    // set of operations
}

It would not be a typical way to use an Observable, but rather unique. So when someone sees such code, he will be confused as to what is happening here underneath.

Considering that such functionality won't be used often, a dedicated method is better, like toAsyncIterable(), or shorter iterate():

for await (const a of $myObservable.iterate()) {
    // set of operations
}

Now we know, from this code signature, that we are working with an iterable here, no more mystery or confusion.


Also, by making Observable an AsyncIterable, you effectively announce it as a good/right usage pattern for observables, which it is totally not, it is just a convenience, to be used where it is necessary, and not something to be abused, and rest assured, it will be. Please do not introduce bad usage patterns, add explicit method iterate() instead as above.

If you ignore my advice, I guarantee that lots of junior developers will end up writing terrible code, by using observables as iterables everywhere. It will create a disaster. With method iterate(), a senior developer will be able to easily find all such use cases during code reviews.

Also, without dedicated method, I can easily foresee that code-styling libraries will be adding a new rule to forbid implicit use of observables as iterables. Another reason why it is best to avoid such practice.