trello-archive/RxLifecycle

CancellationException: Single::takeUntil(Observable) when pressing home button

Closed this issue · 12 comments

When I use a single I get an error when switching an app to the background.

For example (Kotlin code) I have this in a button onClick:

// Fake a long running call
Single.just("test")
                .delay(2, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .bindToLifecycle(this)
                .subscribeWith {
                    onSuccess { Log.d("test", "onSuccess: $it") }
                    onError { Log.d("test", "onError: $it") }
                }

If I press the Android Home button within the 2 seconds I get this error in the onError: java.util.concurrent.CancellationException: Single::takeUntil(Observable) - Stream was canceled before emitting a terminal event.

The expected result would be that the observable chain is unsubscribed and the onSuccess and onError will never fire...

One other thing I noticed is that in this case:

Single.just("test")
                .delay(2, TimeUnit.SECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .bindToLifecycle(this)
                .doOnUnsubscribe { Log.d("test", "doOnUnsubscribe") }
                .subscribeWith {
                    onSuccess { Log.d("test", "onSuccess: $it") }
                    onError { Log.d("test", "onError: $it") }
                }

The doOnUnsubscribe is never called. In other words it seems the Single is never unsubscribed.

Additionally I created a test case in the RxLifecycle project:

    @Test
    public void testSingle() {
        BehaviorSubject<com.trello.rxlifecycle.android.FragmentEvent> lifecycle = BehaviorSubject.create();
        TestSubscriber<Object> testSubscriber = new TestSubscriber<>();

        observable.toSingle().compose(RxLifecycle.bindUntilEvent(lifecycle, FragmentEvent.PAUSE).forSingle())
                .subscribe(testSubscriber);

        lifecycle.onNext(FragmentEvent.ATTACH);
        assertFalse(testSubscriber.isUnsubscribed());
        lifecycle.onNext(FragmentEvent.CREATE);
        assertFalse(testSubscriber.isUnsubscribed());
        lifecycle.onNext(FragmentEvent.CREATE_VIEW);
        assertFalse(testSubscriber.isUnsubscribed());
        lifecycle.onNext(FragmentEvent.START);
        assertFalse(testSubscriber.isUnsubscribed());
        lifecycle.onNext(FragmentEvent.RESUME);
        assertFalse(testSubscriber.isUnsubscribed());
        lifecycle.onNext(FragmentEvent.PAUSE);
        testSubscriber.assertCompleted();
        testSubscriber.assertUnsubscribed();
    }

The above testcase fails for a Single but Succeeds when I use an Observable.

Maybe I'm missing something but I don't see why the behaviour differs, IMO a Single should behave the same as a Observable expect for the fact it always emit a single value / error.

Ah I think I understand what's going wrong. RxLifecycle works in such a way that it completes an Observable when the bound lifecycle event happens. Since a Single can't complete without a value an error is generated.

What would be better IMO is to unsubscribe from the Observable / Single when the bound lifecycle event happens.

dlew commented

See: https://github.com/trello/RxLifecycle#unsubscription

There's no way to actually do what you're asking for via compose() for Single, so we make a best effort here. If you need the behavior to be unsubscription, then you will have to do it manually.

Why not?

You could do something like this in the UntilLifecycleSingleTransformer?

final class UntilLifecycleSingleTransformer<T, R> implements Single.Transformer<T, T> {

    final Observable<R> lifecycle;

    public UntilLifecycleSingleTransformer(@Nonnull Observable<R> lifecycle) {
        this.lifecycle = lifecycle;
    }

    @Override
    public Single<T> call(final Single<T> source) {
        return Single.create(new Single.OnSubscribe<T>() {
            @Override
            public void call(SingleSubscriber<? super T> singleSubscriber) {
                final Subscription subscription = source.subscribe(singleSubscriber);
                lifecycle.subscribe(new Action1<R>() {
                    @Override
                    public void call(R r) {
                        // Event reached, unsubscribe from source
                        subscription.unsubscribe();
                    }
                });
            }
        });
    }
}
dlew commented

RxLifecycle used to do something very similar to that but it was pointed out to be inherently unsafe: ReactiveX/RxAndroid#168 (comment)

You can read more about it here: https://github.com/trello/RxLifecycle/blob/1.x/CHANGELOG.md#020

I trust you're right :) But I've been reading the comments, I still don't fully understand why it's unsafe to unsubscribe from the source in this fashion (except from the fact that my code snippet in incomplete).

I recently wrote a similar operator for showing a progress dialog. In this case I used fromEmitter and implemented setCancellation which also unsubscribes from the source if you unsubscribe from the returned Observable or cancel the progress dialog. So I must assume this is unsafe as well?

fun <T> Observable<T>.withHud(activity: BaseActivity, @StringRes title: Int? = null, cancelable: Boolean = false): Observable<T> {
    val source = this

    return Observable.fromEmitter<T>({ emitter ->
        val hud = ProgressHUD.show(activity, cancelable, null)
        val sourceSubscription = source
                .doOnUnsubscribe { hud.dismiss() }
                .subscribe(emitter)

        hud.setOnCancelListener { sourceSubscription.unsubscribe() }
        emitter.setCancellation { sourceSubscription.unsubscribe() }
    }, Emitter.BackpressureMode.BUFFER)
}
dlew commented

To be honest, this stuff makes my head spin, so I sort of defer to Karnok on matters like this.

Maybe you could mention him in this thread so he can help me understand why this is unsafe?

It is unsafe because there is no guarantee bindToLifecycle is the last operator in the chain and if even so, there is no guarantee a Subscriber doesn't have resources associated with it. Resources are freed by calling unsubscribe() on the Subscriber which propagates upstream. An unsubscribe somewhere upstream won't propagate down and thus may leak resources.

public Observable<String> someAPI() {
   return Observable.just("1").delay(2, TimeUnit.SECONDS)
   .compose(bindToLifecycle(this)); // <-- how conventient?!
}

someAPI()
.observeOn(Schedulers.io())
.flatMap(v -> otherAPICall())
.subscribe(...)

One needs to communicate some terminal event, such as the error in this issue or an onCompleted signal to indicate the sequence is done and trigger the necessary cleanup.

So instead of this type of lifecylce binding, you'd need to have your own custom subscribe() that calls unsubscribe at the very end of the chain:

fun Observable<T>.subscribeUntil(Activity bind, Action1<T> onNext, Action1<Throwable> onError, Action0 onCompleted) {
    Subscription s = this.subscribe(onNext, onError, onCompleted);

    bind.addDestroyListener(() -> s.unsubscribe()); // or something similar
}

fun Observable<T>.subscribeUntil(Activity bind, Observer<T> observer) {
    Subscription s = this.subscribe(observer);

    bind.addDestroyListener(() -> s.unsubscribe()); // or something similar
}
``` 

Thanks for the explanation @akarnokd! It makes sense that if you unsubscribe halfway in the chain you might get leaks downstream.

Since I work with Singles a lot it quite a hassle to take the CancellationException into consideration... So I created the following Kotlin extensions to handle unsubscribing:

fun <T,E> Single<T>.subscribeUntil(provider: com.trello.rxlifecycle.LifecycleProvider<E>, event: E, body: FunctionSingleSubscriberModifier<T>.() -> Unit) =
        subscribeUntil(body, provider.bindUntilEvent<Unit>(event))

fun <T,E> Single<T>.subscribeUntil(provider: com.trello.rxlifecycle.LifecycleProvider<E>, body: FunctionSingleSubscriberModifier<T>.() -> Unit) =
        subscribeUntil(body, provider.bindToLifecycle<Unit>())

fun <T> Single<T>.subscribeUntilDetach(view: View, body: FunctionSingleSubscriberModifier<T>.() -> Unit) = 
        subscribeUntil(body, RxLifecycleAndroid.bindView<Unit>(view))

private fun <T> Single<T>.subscribeUntil(body: FunctionSingleSubscriberModifier<T>.() -> Unit, transformer: Observable.Transformer<Unit, *>): Subscription {
    val sourceSubscription = this.subscribeWith(body)
    val untilSubscription = Observable.create<Unit> {  }.compose(transformer).subscribeWith {
        onCompleted { sourceSubscription.unsubscribe() }
    }
    return CompositeSubscription(sourceSubscription, untilSubscription)
}

Is there a reason this library can't provide some kind of Subscriber "wrapper" that would properly handle unsubscriptions?

I'm thinking instead of this:

public class MyActivity extends RxActivity {
    @Override
    public void onResume() {
        super.onResume();
        myObservable
            .compose(bindToLifecycle())
            .subscribe(new Subscriber() {...});
    }
}

We would have something like this:

public class MyActivity extends RxActivity {
    @Override
    public void onResume() {
        super.onResume();
        myObservable
            .subscribe(bindToLifecycle(new Subscriber() {...}));
    }
}

Makes sense?

dlew commented

You could, but then you'd be coupling the Subscriber directly to the Activity. Right now it works with any generic lifecycle stream (which in many cases is provided by the Activity but does not have to be).