RxJava 2 introduction
What is RxJava 2?
A library for composing asynchronous and event-based programs using observable sequences for the Java VM. It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
Getting started
- Observer pattern
- Observable objects are data. In RxJava 2, class
Observable
represents the source of data. - Observer listening to the observable objects.
- An
Observable
emit items, an observer consumes these items. - The consumer consumes the items in the method
subscribe()
.
Notice: In RxJava documentation, emission, emits, item, event, signal, data and message are considered synonyms and represent the object traveling along the dataflow.
Hello world
Observable<String> helloWorld = Observable.just("Hello World!"); // create a Observable
helloWorld.subscribe(
str -> System.out.println(str) // Subscriber consuming items.
);
Observable
Observable
is the source of data. It can emit any number of item(including zero item). It can also terminate either successfully or with an error.
- A
Observable
can have any number of consumer(subscriber, observer). onNext()
method is called on each subscriber if a new item is emitted from theObservable
onComplete()
method is called on each subscriber if theObservable
finishes its dataflow successfully.onError()
method is called on each subscriber if theObservable
finishes its dataflow with an error.
Observable<String> helloWorld = Observable.just("Hello World!"); // create a Observable
helloWorld.subscribe(
str -> System.out.println(str), // onNext()
e -> System.out.println(e), // onError()
() -> System.out.println("completed") // onComplete()
);
Creating data
Observable.just()
: create anObservable
that emits a particular item.Observable.fromArray()
: create anObservable
that contains an array of items.Observable.interval()
: create anObservable
that generates an infinite, ever increasing numbers(Long
type of default vale0L
).Observable.range()
: create anObservable
that generates a sequences of integer number).Observable.empty()
: create anObservable
with empty item.Observable.timer()
: After a specified time, thisObservable
signals a single0L
.Observable.fromCallable()
: Create anObservable
that emits a value (or throw exception) returned by a givenjava.util.concurrent.Callable
.
More creation operators could be found here
Transformation operators
Observable.map()
: change the type of the emitted items by applying the given function.Observable.flatMap()
: convert each item emitted by a reactive source to aObservable
by applying the given function, then merge and emit the result.Observable.concatMap()
: convert each item emitted by a reactive source to aObservable
by applying the given function, then concat and emit the result.Observable.switchMap()
: convert each item emitted by a reactive source to aObservable
by applying the given function, then emit only the most recent result.Observable.buffer()
: gather a set of items into a buffer and emit them all at once.Observable.groupBy()
: group the items according to a specified criterion, and emits these grouped items as aGroupedObservable
orGroupedFlowable
More transformation operators could be found here
flatMap()
, concatMap()
and switchMap()
?
What is the difference between flatMap()
does NOT care about orderconcatMap()
cares about orderswitchMap()
emits the most recent item.
More operators...
More operators could be found here. Some common operators are:
- Combining operators
- Conditional & Boolean Operators
- Mathematical and aggregate operators
- Filtering operators
Async
Observer consumes the items on the subscribe()
method, which is by default running on main thread. Observable
generates dataflow on the main thread as well. We hope, Observable
could do its job on a background thread and the observer could consume the data on another thread. RxJava makes it easy. We need subscribeOn()
and observerOn()
operators.
subscribeOn()
tells the Observable
which thread to generate and emit items. It does not matter where you put this method in your observable chain of operators. All operators from upstream to downstream are on the same thread.
observerOn()
switches the working thread for all remaining operators.
Schedulers
The observerOn()
and subscribeOn()
methods take an arguments of type Scheduler
. RxJava has already created a set of Scheduler
for the common case.
Schedulers.io()
: for I/O intensive workSchedulers.computation()
: for CUP intensive workSchedulers.newThread()
: creates a new thread for each scheduled unit work.Schedulers.trampoline()
: execute works in a FIFO manner on the current thread.Schedulers.single()
: a default, shared, single-thread-backed Scheduler instance for work requiring strongly-sequential execution on the same background thread.
Hot and cold observables
Two type of observables: hot and cold
Cold observables
Cold observables are observables that emit items only if they are subscribed to. A observables could have multiple subscriber, all subscriber receive the same dataflow no matter when the items created and when it is subscribed to.
For example: just()
, range()
, from()
, interval()
, timer()
are cold observables.
Hot observables
Hot observables emit the items independent of individual subscriptions. It has its own timeline and emits items on matter how many subscriber (0 or more) is listening to. All subscribers receive the same items from the host observables.
Convert cold observables to hot observables
The easiest way to convert a cold observables to hot observables is using the publish()
operators. publish()
operator returns a ConnectableObservable
object, once we call ConnectableObservable.connect()
, the host observables begin to emit items to its subscribers.
Reference: Hot and Cold observables
Single, Completable, Maybe
Like Observable
, RxJava provides several base classes which are able to generate a dataflow, emit items and be subscribed to.
Single
emits always either one value or an error.Completable
emits no item but a completion or an error. It focus on whether the job is done or not. The subscriber receives no item.Maybe
emits a single value, empty value or an error.
Unlike Observable
, which provides onNext()
, onError()
, onComplete()
method for subscriber, these base classes have different method for subscriber.
Single
class provides onlyonSuccess()
andonError()
method.Completable
class provides onlyonComplete()
andonError()
method.Maybe
class providesonSuccess()
,onError()
andonComplete()
Flowable
Simply put, Flowable
is a backpressure-aware Observable
.
Backpressure
Simply put, Backpressure is when in an Flowable
processing pipeline, some asynchronous stages can't process the values fast enough and need a way to tell the upstream producer to slow down.
What is the problem?
In the following example, we emit one item every milliseconds, but subscriber could only consume one item per second. The rest 999 items are added to the Buffer every second. In the early RxJava, the buffer is unbounded. As time goes on, it might cause OOM.
Observable.interval(1, TimeUnit.MILLISECONDS)
.observeOn(Schedulers.computation()) // async. In a different thread(scheduler)
.subscribe(l -> {
Thread.sleep(1000);
System.out.println(l);
});
Another example:
Observable.create(emitter -> {
for(int i = 0 ; ; i++){
emitter.onNext(i);
}
}).observeOn(Schedulers.computation()) // async. In a different thread(scheduler)
.subscribe(l -> {
Thread.sleep(1000);
Log.d(TAG, "onCreate: " + l);
});
Solution
Backpressure which is a form of flow control where the operators can express how many items are they ready to process. This allows constraining the memory usage of the dataflows in situations where there is generally no way for a step to know how many items the upstream will send to it.
In RxJava 2, the dedicated Flowable
class is designated to support backpressure and Observable
is dedicated for the non-backpressured operations (short sequences, GUI interactions, etc.). The other types, Single
, Maybe
and Completable
don't support backpressure nor should they; there is always room to store one item temporarily.
The difference between Observable
and Flowable
is that Flowable
supports backpressure and Observable
does not support backpressure.
In modern RxJava, most asynchronous operators of a Flowable
now have a bounded internal buffer (defalut 128 elements), like observeOn()
, and any attempt to overflow this buffer will terminate the whole sequence with MissingBackpressureException
.
Observable
does not support backpressure and has an unbounded internal buffer.Flowable
supports backpressure and has a bounded internal buffer.
Backpressure strategy
Most developers encounter backpressure when their application fails with MissingBackpressureException
and the exception usually points to the observeOn
operator. The actual cause is usually the non-backpressured use of PublishProcessor
, timer()
or interval()
or custom operators created via create()
.
There are several ways of dealing with such situations.
-
Increasing buffer. Temporary fix the problem, overflow can still happen if the source produces over the buffer size.
-
onBackpressureBuffer()
: reintroduces an unbounded buffer. -
onBackpressureBuffer(int capacity)
: specify a new size of buffer, throw aBufferOverflowError
if buffer reaches the given capacity. -
onBackpressureBuffer(int capacity,Action onOverflow)
: this overload calls a (shared) action in case an overflow happens. -
onBackpressureBuffer(int capacity,Action onOverflow, BackpressureOverflowStrategy strategy)
: define what to do in case the capacity has been reached. There are 3 strateties:ERROR
: this is the default behavior of the previous two overloads, signaling a BufferOverflowException.LATEST
: if an overflow would happen, the current value will be simply ignored and only the old values will be delivered once the downstream requests.OLDEST
: drops the oldest element in the buffer and adds the current value to it.
-
onBackpressureDrop()
: whenever the downstream is not ready to receive values, this operator will drop that elemenet from the sequence. -
onBackpressureLatest()
: the final operator keeps only the latest value and practically overwrites older, undelivered values.