/Bonsai

A minimal reactive streams Android library

Primary LanguageKotlinApache License 2.0Apache-2.0

Bonsai

A miniature reactive Android library.

Branch Build Status
master Build Status
dev Build Status
Test Coverage
codecov
Latest Version
Download

Why

RxJava is a great library that introduces the concept of reactive programming to your Android code. However, to newcomers, it can be a daunting library with a high learning curve. You have to learn about Schedulers, Observables, Singles, Subscribers, Subscriptions, and many more paradigms that can be confusing to someone new to reactive programming. Additionally, for the Android developer, it can be confusing as Rx is primarily a Java library, and there are some things on Android that are functionally different (e.g. the main thread). To use Rx in an Android app properly, you also need to use RxAndroid. The relationship between the two can be confusing. The aim of Bonsai is to help people interested in using reactive Java in their Android code be introduced to some of the basic concepts of Rx. Bonsai mirrors some of the basic parts of Rx in a way that users of the library can understand. Since there is very little source code compared to the more functional Rx library, it will be easier for users to dive in and see how something works. Additionally, Bonsai has built in support for the Android main thread, which should help users adopt functional programming into their code. Hopefully some of you find this library helpful. This is not meant to be an Rx replacement, but rather a taste of what the reactive world has to offer. In case you are interested, I am currently using this in Lightning Browser.

Usage

include from jcenter

  • compile 'com.anthonycr.bonsai:bonsai:1.1.0'

include from submodule

  • compile project(':library')

The API

  • Completable: The simplest of the reactive API, this observable emits only three events, onStart, onComplete, and onError.
  • Single: A sort of extension of the Completable, this observable emits the same three events as well as an onItem event that only emits a single item.
  • Stream: A more complex extension of the Completable, this observable emits the same three events as well as an onNext event that can be called multiple times to emit multiple items.
  • CompletableAction, SingleAction, StreamAction: Work that you wish to perform when the user subscribes to your observable. Action.onSubscribe is called when the user subscribes to the observable. The work is run on the Scheduler specified by the observable.subscribeOn method.
  • CompletableSubscriber, SingleSubscriber, StreamSubscriber/CompletableOnSubscribe, SingleOnSubscribe, StreamOnSubscribe: The consumer of the observable is the Subscriber. There are two classes describing this in order to separate communication between the subscriber thread (where the work is done) and the observer thread (where it is emitted). Subscriber is used by the subscriber thread to pass events to the OnSubscribe implementation (provided by ths subscriber), which is called on the observe thread.
    • onStart(): Always called when the observable starts (called internally)
    • onNext(T item): Called by the observable if it has an item to pass to you
    • onComplete(): Should be called by the observable when it is done emitting items, this will release the resources used by the observable/subscription unless they are held elsewhere. Not calling this indicates that the observable has more items to emit.
    • onError(Exception exception): The observable should call this if an error occurs. If this method is called, onComplete or onNext should not be called. This method will also be called automatically if your onSubscribe method throws an exception.
  • Scheduler: A thin wrapper around a thread that schedules work to be done.
  • Schedulers: A utility class that creates Scheduler instances for you. See below for a list of provided ones:
    • io(): A single thread that you should reserve for talking to disk. Android disk IO is single threaded for write operations, which is why one thread is used here.
    • main(): A reference to the main thread of your application. All work will be posted to the main message queue.
    • worker(): Runs work on one thread out of a pool.
    • newSingleThreadScheduler(): Creates a new single thread scheduler (like the io() scheduler).
    • immediate(): Runs work immediately and synchronously on the current thread.
    • from(Executor): Allows you to construct a scheduler backed by an Executor of your choice.
  • Subscription: This is returned when you subscribe to an observable. It allows you to unsubscribe from the work. If you unsubscribe, you will no longer receive events. This is especially helpful in Android if we are observing long lasting work in an Activity, we want to unsubscribe in Activity.onDestroy() in order to avoid leaking the Activity.

How to use

Basic example
Stream.create(new StreamAction<String>() {
    @Override
    public void onSubscribe(@NonNull StreamSubscriber<String> subscriber) {
        subscriber.onNext("string 1");
        subscriber.onNext("string 2");
        subscriber.onNext("string 3");
        subscriber.onComplete();
    }
}).subscribeOn(Schedulers.io())
  .observeOn(Schedulers.main())
  .subscribe(new StreamOnSubscribe<String>() {
        @Override
        public void onNext(String item) {
            Log.d(TAG, "Asynchronously received this string: " + item);
        }

        @Override
        public void onComplete() {
            Log.d(TAG, "Finished receiving strings");
        }
  });
Unsubscribe example
private Subscription subscription;

/**
 * An observable that emits sequential Fibonacci numbers as long
 * as the Subscriber is subscribed to it. Fibonacci numbers are
 * Emitted every half a second.
 */
private Stream<Integer> allFibonacciNumbersStream() {
    return Stream.create(new StreamAction<Integer>() {
        @Override
        public void onSubscribe(@NonNull StreamSubscriber<Integer> subscriber) {
            int firstNumber = 0;
            int secondNumber = 1;
            int temp;
            subscriber.onNext(secondNumber);
            while(!subscriber.isUnsubscribed()) {
                temp = secondNumber;
                secondNumber = secondNumber + firstNumber;
                firstNumber = temp;
                subscriber.onNext(secondNumber);
                try {
                    Thread.sleep(500);
                } catch(InterruptedException exception) {
                    subscriber.onError(exception);
                    return;
                }
            }
            subscriber.onComplete();
        }
    });
}

private void doWorkOnMainThread() {
    subscription = allFibonacciNumbersStream()
        .subscribeOn(Schedulers.worker())
        .observeOn(Schedulers.main())
        .subscribe(new StreamOnSubscribe<Integer>() {
            @Override
            public void onStart() {
                Log.d(TAG, "Started receiving numbers");
            }

            @Override
            public void onNext(Integer item) {
                Log.d(TAG, "Asynchronously received this fibonacci number: " + item);
            }

            @Override
            public void onError(Exception error) {
                Log.d(TAG, "Error occurred while receiving numbers", error);
            }
        });
}

@Override
public void onDestroy() {
    super.onDestroy();
    if (subscription != null) {
        subscription.unsubscribe();
    }
}
List Example
final List<String> list = new ArrayList();
Stream.create(new StreamAction<List<String>>() {
    @Override
    public void onSubscribe(@NonNull StreamSubscriber<List<String>> subscriber) {
        List<String> stringList = new ArrayList<>();
        stringList.add("string 1");
        stringList.add("string 2");
        stringList.add("string 3");
        subscriber.onNext(stringList);
        subscriber.onComplete();
    }
}).subscribeOn(Schedulers.immediate())
  .observeOn(Schedulers.immediate())
  .subscribe(new StreamOnSubscribe<List<String>>() {
        @Override
        public void onNext(List<String> item) {
            list.addAll(item);
        }
        
        @Override
        public void onComplete() {
            Log.d(TAG, "We're done!");
        }
  });
  
for(String string : list) {
    Log.d(TAG, "Received: " + string);
}
Completable Example
Completable.create(new CompletableAction() {
    @Override
    public void onSubscribe(@NonNull CompletableSubscriber subscriber) {
        // Do some work
        subscriber.onComplete();
    }
}).subscribeOn(Schedulers.io())
  .observeOn(Schedulers.main())
  .subscribe(new CompletableOnSubscribe() {
        @Override
        public void onComplete() {
            Log.d(TAG, "Work is done");
        }
  });
Single Example
Single.create(new SingleAction<Integer>() {
    @Override
    public void onSubscribe(@NonNull SingleSubscriber<Integer> subscriber) {
        subscriber.onItem(1);
        subscriber.onComplete();
    }
}).subscribeOn(Schedulers.io())
  .observeOn(Schedulers.main())
  .subscribe(new SingleOnSubscribe<Integer>() {
        @Override
        public void onItem(@Nullable Integer item) {
            Log.d(TAG, "Only item received: " + item);
        }
  });

What's missing

  • Backpressure. If an observable emits events faster than the subscriber can consume them, the subscriber thread will get backed up with a queue of unhandled items. This is an important ReactiveX feature that is not supported by this minimal implementation.
  • Replaying events. Related to backpressure, events are not cached and are not able to be replayed.
  • Multiple subscribers. Only one subscriber per observable instance is supported.
  • Transformers. There is currently no way to transform or re-map a stream of items.

License

Copyright 2016 Anthony Restaino

Licensed under the Apache License, Version 2.0 (the "License"); you may 
not use this file except in compliance with the License. You may obtain 
a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 
License for the specific language governing permissions and limitations 
under the License.