CancellationException: Single::takeUntil(Observable) when pressing home button
Closed this issue · 12 comments
When I use a single I get an error when switching an app to the background.
For example (Kotlin code) I have this in a button onClick
:
// Fake a long running call
Single.just("test")
.delay(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.bindToLifecycle(this)
.subscribeWith {
onSuccess { Log.d("test", "onSuccess: $it") }
onError { Log.d("test", "onError: $it") }
}
If I press the Android Home button within the 2 seconds I get this error in the onError
: java.util.concurrent.CancellationException: Single::takeUntil(Observable) - Stream was canceled before emitting a terminal event.
The expected result would be that the observable chain is unsubscribed and the onSuccess
and onError
will never fire...
One other thing I noticed is that in this case:
Single.just("test")
.delay(2, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.bindToLifecycle(this)
.doOnUnsubscribe { Log.d("test", "doOnUnsubscribe") }
.subscribeWith {
onSuccess { Log.d("test", "onSuccess: $it") }
onError { Log.d("test", "onError: $it") }
}
The doOnUnsubscribe
is never called. In other words it seems the Single is never unsubscribed.
Additionally I created a test case in the RxLifecycle project:
@Test
public void testSingle() {
BehaviorSubject<com.trello.rxlifecycle.android.FragmentEvent> lifecycle = BehaviorSubject.create();
TestSubscriber<Object> testSubscriber = new TestSubscriber<>();
observable.toSingle().compose(RxLifecycle.bindUntilEvent(lifecycle, FragmentEvent.PAUSE).forSingle())
.subscribe(testSubscriber);
lifecycle.onNext(FragmentEvent.ATTACH);
assertFalse(testSubscriber.isUnsubscribed());
lifecycle.onNext(FragmentEvent.CREATE);
assertFalse(testSubscriber.isUnsubscribed());
lifecycle.onNext(FragmentEvent.CREATE_VIEW);
assertFalse(testSubscriber.isUnsubscribed());
lifecycle.onNext(FragmentEvent.START);
assertFalse(testSubscriber.isUnsubscribed());
lifecycle.onNext(FragmentEvent.RESUME);
assertFalse(testSubscriber.isUnsubscribed());
lifecycle.onNext(FragmentEvent.PAUSE);
testSubscriber.assertCompleted();
testSubscriber.assertUnsubscribed();
}
The above testcase fails for a Single
but Succeeds when I use an Observable
.
Maybe I'm missing something but I don't see why the behaviour differs, IMO a Single should behave the same as a Observable expect for the fact it always emit a single value / error.
Ah I think I understand what's going wrong. RxLifecycle works in such a way that it completes an Observable when the bound lifecycle event happens. Since a Single can't complete without a value an error is generated.
What would be better IMO is to unsubscribe from the Observable / Single when the bound lifecycle event happens.
See: https://github.com/trello/RxLifecycle#unsubscription
There's no way to actually do what you're asking for via compose()
for Single
, so we make a best effort here. If you need the behavior to be unsubscription, then you will have to do it manually.
Why not?
You could do something like this in the UntilLifecycleSingleTransformer
?
final class UntilLifecycleSingleTransformer<T, R> implements Single.Transformer<T, T> {
final Observable<R> lifecycle;
public UntilLifecycleSingleTransformer(@Nonnull Observable<R> lifecycle) {
this.lifecycle = lifecycle;
}
@Override
public Single<T> call(final Single<T> source) {
return Single.create(new Single.OnSubscribe<T>() {
@Override
public void call(SingleSubscriber<? super T> singleSubscriber) {
final Subscription subscription = source.subscribe(singleSubscriber);
lifecycle.subscribe(new Action1<R>() {
@Override
public void call(R r) {
// Event reached, unsubscribe from source
subscription.unsubscribe();
}
});
}
});
}
}
RxLifecycle used to do something very similar to that but it was pointed out to be inherently unsafe: ReactiveX/RxAndroid#168 (comment)
You can read more about it here: https://github.com/trello/RxLifecycle/blob/1.x/CHANGELOG.md#020
I trust you're right :) But I've been reading the comments, I still don't fully understand why it's unsafe to unsubscribe from the source in this fashion (except from the fact that my code snippet in incomplete).
I recently wrote a similar operator for showing a progress dialog. In this case I used fromEmitter
and implemented setCancellation
which also unsubscribes from the source if you unsubscribe from the returned Observable or cancel the progress dialog. So I must assume this is unsafe as well?
fun <T> Observable<T>.withHud(activity: BaseActivity, @StringRes title: Int? = null, cancelable: Boolean = false): Observable<T> {
val source = this
return Observable.fromEmitter<T>({ emitter ->
val hud = ProgressHUD.show(activity, cancelable, null)
val sourceSubscription = source
.doOnUnsubscribe { hud.dismiss() }
.subscribe(emitter)
hud.setOnCancelListener { sourceSubscription.unsubscribe() }
emitter.setCancellation { sourceSubscription.unsubscribe() }
}, Emitter.BackpressureMode.BUFFER)
}
To be honest, this stuff makes my head spin, so I sort of defer to Karnok on matters like this.
Maybe you could mention him in this thread so he can help me understand why this is unsafe?
It is unsafe because there is no guarantee bindToLifecycle
is the last operator in the chain and if even so, there is no guarantee a Subscriber
doesn't have resources associated with it. Resources are freed by calling unsubscribe()
on the Subscriber
which propagates upstream. An unsubscribe
somewhere upstream won't propagate down and thus may leak resources.
public Observable<String> someAPI() {
return Observable.just("1").delay(2, TimeUnit.SECONDS)
.compose(bindToLifecycle(this)); // <-- how conventient?!
}
someAPI()
.observeOn(Schedulers.io())
.flatMap(v -> otherAPICall())
.subscribe(...)
One needs to communicate some terminal event, such as the error in this issue or an onCompleted signal to indicate the sequence is done and trigger the necessary cleanup.
So instead of this type of lifecylce binding, you'd need to have your own custom subscribe()
that calls unsubscribe
at the very end of the chain:
fun Observable<T>.subscribeUntil(Activity bind, Action1<T> onNext, Action1<Throwable> onError, Action0 onCompleted) {
Subscription s = this.subscribe(onNext, onError, onCompleted);
bind.addDestroyListener(() -> s.unsubscribe()); // or something similar
}
fun Observable<T>.subscribeUntil(Activity bind, Observer<T> observer) {
Subscription s = this.subscribe(observer);
bind.addDestroyListener(() -> s.unsubscribe()); // or something similar
}
```
Thanks for the explanation @akarnokd! It makes sense that if you unsubscribe halfway in the chain you might get leaks downstream.
Since I work with Single
s a lot it quite a hassle to take the CancellationException
into consideration... So I created the following Kotlin extensions to handle unsubscribing:
fun <T,E> Single<T>.subscribeUntil(provider: com.trello.rxlifecycle.LifecycleProvider<E>, event: E, body: FunctionSingleSubscriberModifier<T>.() -> Unit) =
subscribeUntil(body, provider.bindUntilEvent<Unit>(event))
fun <T,E> Single<T>.subscribeUntil(provider: com.trello.rxlifecycle.LifecycleProvider<E>, body: FunctionSingleSubscriberModifier<T>.() -> Unit) =
subscribeUntil(body, provider.bindToLifecycle<Unit>())
fun <T> Single<T>.subscribeUntilDetach(view: View, body: FunctionSingleSubscriberModifier<T>.() -> Unit) =
subscribeUntil(body, RxLifecycleAndroid.bindView<Unit>(view))
private fun <T> Single<T>.subscribeUntil(body: FunctionSingleSubscriberModifier<T>.() -> Unit, transformer: Observable.Transformer<Unit, *>): Subscription {
val sourceSubscription = this.subscribeWith(body)
val untilSubscription = Observable.create<Unit> { }.compose(transformer).subscribeWith {
onCompleted { sourceSubscription.unsubscribe() }
}
return CompositeSubscription(sourceSubscription, untilSubscription)
}
Is there a reason this library can't provide some kind of Subscriber
"wrapper" that would properly handle unsubscriptions?
I'm thinking instead of this:
public class MyActivity extends RxActivity {
@Override
public void onResume() {
super.onResume();
myObservable
.compose(bindToLifecycle())
.subscribe(new Subscriber() {...});
}
}
We would have something like this:
public class MyActivity extends RxActivity {
@Override
public void onResume() {
super.onResume();
myObservable
.subscribe(bindToLifecycle(new Subscriber() {...}));
}
}
Makes sense?
You could, but then you'd be coupling the Subscriber
directly to the Activity
. Right now it works with any generic lifecycle stream (which in many cases is provided by the Activity
but does not have to be).