Synchronous emit to `switchMap` during the processing of previous emit prevents the first from unsubscribing
atscott opened this issue · 4 comments
Describe the bug
I think this may be a duplicate of #7230 but not entirely sure.
When a switchMap
is in the process of subscribing to one next
notification, it will not unsubscribe if another next
happens synchronously.
Expected behavior
switchMap
should always unsubscribe from previous subscriptions if a new value comes in.
Reproduction code
https://stackblitz.com/edit/rxjs-a21xi5?devtoolsheight=60&file=index.ts
import { switchMap, Subject, finalize, from, filter } from 'rxjs';
const sub = new Subject<number>();
const finalizations = new Subject<number>();
sub
.pipe(
switchMap((id) => {
return from(Promise.resolve(id)).pipe(
finalize(() => {
console.log(`finalize for ${id}`);
finalizations.next(id);
})
);
})
)
.subscribe((id) => {
console.log(`next emit for ${id}`);
});
finalizations.pipe(filter((id) => id === 1)).subscribe(() => {
sub.next(3);
});
sub.next(1);
sub.next(2);
This outputs the following
finalize for 1
next emit for 3
finalize for 3
next emit for 2
finalize for 2
I would expect it to not produce "next emit for 2"
Reproduction URL
No response
Version
8.0.0-aplha-12
Environment
No response
Additional context
This is not a regression. I'm seeing this as far back as rxjs 6.
This tricked me for a few minutes. This isn't a bug, it's expected behavior.
finalize
will ALWAYS be called whether the source is completed, errored, or unsubscribed. In this case, the finalize
callback is called synchronously during the unsubscription to the first observable, causing value 3
to fall through.
What you're looking for is tap({ complete: () => { /* do stuff */ } })
not finalize
... finalize
is like a "finally" block. It's GUARANTEED to be called no matter what. This is useful for tearing things down or logging.
Hi @benlesh thanks for the response. I think I'm a bit confused or not totally following the explanation. I do expect the finalize
for 2
to emit. What I did not expect was that 2
value comes out of the pipe to the subscriber and logs next emit for 2
. Shouldn't a new value coming into the switchMap
cause the previous inner subscriber to always unsubscribe?
const sub = new Subject<number>();
const finalizations = new Subject<number>();
sub
.pipe(
switchMap((id) => {
return from(Promise.resolve(id)).pipe(
finalize(() => {
console.log(`finalize for ${id}`);
finalizations.next(id);
})
);
})
)
.subscribe((id) => {
console.log(`next emit for ${id}`);
});
finalizations.pipe(filter((id) => id === 1)).subscribe(() => {
sub.next(3);
});
sub.next(1);
sub.next(2);
sub.next(1)
- Synchronously call
switchMap
's mapping function with1
- Which maps into the
from(Promise.resolve(1)).pipe(finalize(STUFF))
observable, we'll call thatinnerObservable1
. At this point the promise has already scheduled a microtask to emit on. switchMap
synchronously subscribes toinnerObservable1
. Nothing else happens, as we have to wait for the promise to resolve.sub.next(2)
, again synchronously. (We don't get back to this point for a bit)- We unsubscribe from the subscription to
innerObservable1
. This triggers thefinalize
callback!. - The
finalize
callback logsfinalize for 1
, then callsfinalizations.next(1)
synchronously. - The value 1 passes the
filter
in our second observable, and then synchronously hits the next handler in the subscription, callingsub.next(3)
. - We don't have to unsubscribe any inner subscriptions, because we already did that on 6.
- Synchronously call the
switchMap
's mapping function with3
. - Which maps into the
from(Promise.resolve(3)).pipe(finalize(STUFF))
observable, we'll call thatinnerObservable3
. At this point the promise has already scheduled a microtask to emit on. switchMap
sends the value3
from the resolved promise inside ofinnerObservable3
to the consumer loggingnext emit for 3
.innerObservable3
completes, triggering thefinalize
. Which logsfinalize for 3
, then callsfinalizations.next(3)
.3
doesn't pass thefilter
in the other subscription, so nothing is done there.- Back to the step after 5 above,
innerObservable3
has cleaned up, so there's no inner subscription to unsubscribe from, and we callswitchMap
's mapping function with the value2
. - Which maps into the
from(Promise.resolve(2)).pipe(finalize(STUFF))
observable, which we'll callinnerObservable2
. At this point the promise has already scheduled a microtask to emit on. - We synchronously subscribe to
innerObservable2
. - When the promise resolves, we emit
2
to the consumer, loggingnext emit for 2
. - Then
innerObservable2
completes, callingfinalize
's callback. - The
finalize
callback synchronously logsfinalize for 2
, then callsfinalizations.next(2)
. 2
does not pass the filter so nothing happens in the other observable.
Moral of the story: Don't do too much synchronous stuff in observables, it's really weird. :P