How to implement polling using Observables?
lexer opened this issue ยท 20 comments
I've got parametrized rest call with Observable interface:
api.updateAppState(params);
I want to repeat it with delay so I have created delayed version of it:
Observable<AppState> delayedApiCall = Observable.interval(delay, TimeUnit.SECONDS)
.first()
.flatMap(new Func1<Long, Observable<AppState>>() {
@Override
public Observable<AppState> call(Long seconds) {
return lyftApi.updateAppState(params);
}
});
But now I want to have polling observable that will recreate "delayedApiCall" with params and produce continuous results. I also want observable to continue producing results even if error was returned in "delayedApiCall".
pollingObservable.subscribe(new Observer<AppState>() {
onNext(AppState appSte) {
....
},
onError(Throwable e) {
....
}
});
I think about this implementation:
Observable.create(new Observable.OnSubscribeFunc<AppState>() {
public Subscription currentRequest;
@Override
public Subscription onSubscribe(final Observer<? super AppState> observer) {
currentRequest = createDelayedApiCal().subscribe(createRequestObserver(observer));
return new Subscription() {
@Override
public void unsubscribe() {
currentRequest.unsubscribe();
}
};
}
private Observer<AppState> createRequestObserver(final Observer<? super AppState> observer) {
return new Observer<AppState>() {
@Override
public void onCompleted() {
currentRequest = createDelayedApiCal().subscribe(createRequestObserver(observer));
}
@Override
public void onError(Throwable throwable) {
observer.onError(throwable);
}
@Override
public void onNext(AppState appState) {
observer.onNext(appState);
}
};
}
});
But we have recursive subscribe here. Probably this is bad usage of observables. Could you share your opinion?
What about this?
Observable.interval(delay, TimeUnit.SECONDS)
.flatMap(new Func1<Long, Observable<Notification<AppState>>>() {
public Observable<Notification<AppState>> call(Long seconds) {
return lyftApi.updateAppState(params).materialize(); } });
@samuelgruetter interval won't wait until operation will be completed. im also guess that same params will be used for each request in this case.
Yes.
- What will params depend on?
- Somehow you should make sure that if an updateAppState request is lost in the network and you never get any onNext, onComplete or onError, you still keep polling. That's why I suggested the above approach with interval. But you could also combine your recursive approach with the timeout operator.
PS I had to delete one post because I accidentally hit 'comment' too early on my smartphone
Any conclusion on this?
@benjchristensen probably yes. If we will use @samuelgruetter implementation with recently implemented TimeInterval operator.
Observable.interval(delay, TimeUnit.SECONDS).timeInterval()
.flatMap(new Func1<Long, Observable<Notification<AppState>>>() {
public Observable<Notification<AppState>> call(Long seconds) {
return lyftApi.updateAppState(params).materialize(); } });
Also in my case i still have hacky implementation since my delay is dynamic and I need Delay operator to be implemented.
How about something like this?
Observable.create({ observer ->
Schedulers.newThread().schedulePeriodically({
observer.onNext("application-state-from-network");
}, 0, 1000, TimeUnit.MILLISECONDS);
}).take(10).subscribe({ v -> println(v) });
Here is Java using manual recursion instead of schedulePeriodically
:
Observable.create(new OnSubscribeFunc<String>() {
@Override
public Subscription onSubscribe(final Observer<? super String> o) {
return Schedulers.newThread().schedule(0L, new Func2<Scheduler, Long, Subscription>() {
@Override
public Subscription call(Scheduler inner, Long t2) {
o.onNext("data-from-polling");
return inner.schedule(t2, this, 1000, TimeUnit.MILLISECONDS);
}
});
}
}).toBlockingObservable().forEach(new Action1<String>() {
@Override
public void call(String v) {
System.out.println("output: " + v);
}
});
I think primary problem is that schedulePeriodically do not wait till operation is completed.
Java ScheduledExecutorService(http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html) has two different methods:
- scheduleWithFixedDelay
- scheduleAtFixedRate
So I think we need to add scheduleWithFixedDelay analog to Rx scheduler.
@benjchristensen Haven't noticed you second message. Thanks a lot for this manual recursion implementation!
How can scheduling periodically but wait for completion be done today using RxJava? Your code snippet has several things that have been deprecated, so I am wondering what an alternative would look like?
Use Observable.interval()
as the starting point and forget that manual recursion.
Observable.interval(1, TimeUnit.SECONDS, Schedulers.io())
.map(time -> pollSource())
.toBlocking()
.subscribe(System.out::println, Throwable::printStackTrace)
The problem with that is that the "interval" operator is dumb. It will continue to pump out a pulse every X seconds without any regards what happens down the line. That is not what I want. The "toBlocking" does nothing to fix this.
The behavior I want is a new tick from the time the poll finished not every 20 seconds:
What your code does:
0 (seconds): tick generated
0: poll started
5: poll finished
20: new tick generated
What I want:
0 (seconds): tick generated
0: poll started
5: poll finished
25: new tick generated
Okay then, how about:
Observable.fromCallable(() -> pollValue())
.repeatWhen(o -> o.concatMap(v -> Observable.timer(20, TimeUnit.SECONDS)));
I did some testing and it looks like it's working as intended. I will have to throw in some conditional check so it doesn't go on forever but it looks like we have a winner!
Thank you so much! I really appreciate your help!
@akarnokd
I maybe be doing something wrong but it looks like unsubscribing to the Observable you suggested won't stop the currently scheduled emission from going out, any way to get around this? Thanks!
@cxzhang2 An unsubscribe should stop that timer. Do you have some code that demonstrates what you experience?
You can use retryWhen
instead of repeatWhen
and in your map()
operator use Rx.Observable.throw(new Error('My Error Message'))
in case you want to retry the request