/RxKotlin

RxJava bindings for Kotlin

Primary LanguageKotlinApache License 2.0Apache-2.0

RxKotlin

Kotlin Extensions for RxJava

RxKotlin is a lightweight library that adds convenient extension functions to RxJava. You can use RxJava with Kotlin out-of-the-box, but Kotlin has language features (such as extension functions) that can streamline usage of RxJava even more. RxKotlin aims to conservatively collect these conveniences in one centralized library, and standardize conventions for using RxJava with Kotlin.

import io.reactivex.rxkotlin.subscribeBy
import io.reactivex.rxkotlin.toObservable

fun main(args: Array<String>) {

    val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

    list.toObservable() // extension function for Iterables
            .filter { it.length >= 5 }
            .subscribeBy(  // named arguments for lambda Subscribers
                    onNext = { println(it) },
                    onError =  { it.printStackTrace() },
                    onComplete = { println("Done!") }
            )

}

Resources

Learning RxJava Packt Book

Chapter 12 of Learning RxJava covers RxKotlin and Kotlin idioms with RxJava.

Reactive Programming in Kotlin Packt Book

The book Reactive Programming in Kotlin mainly focuses on RxKotlin, as well as learning reactive programming with Kotlin.

Kotlin Slack Channel

Join us on the #rx channel in Kotlin Slack!

https://kotlinlang.slack.com/messages/rx

Support for RxJava 1.x and RxJava 2.x

Use RxKotlin 1.x versions to target RxJava 1.x.

Use RxKotlin 2.x versions to target RxJava 2.x.

The maintainers do not update the RxJava dependency version for every RxJava release, so you should explicitly add the desired RxJava dependency version to your pom.xml or build.gradle.

Build

Build Status

Binaries

Binaries and dependency information for Maven, Ivy, Gradle and others can be found at http://search.maven.org.

RxKotlin 1.x

Example for Maven:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxkotlin</artifactId>
    <version>1.x.y</version>
</dependency>

and for Gradle:

compile 'io.reactivex:rxkotlin:x.y.z'

RxKotlin 2.x

Example for Maven:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxkotlin</artifactId>
    <version>2.x.y</version>
</dependency>

and for Gradle:

compile 'io.reactivex.rxjava2:rxkotlin:x.y.z'

Building with JitPack

You can also use Gradle or Maven with JitPack to build directly off a snapshot, branch, or commit of this repository.

For example, to build off the 1.x branch, use this setup for Gradle:

repositories {
    maven { url 'https://jitpack.io' }
}

dependencies {
    compile 'com.github.ReactiveX:RxKotlin:1.x-SNAPSHOT'
}

Use this setup for Maven:

	<repositories>
		<repository>
		    <id>jitpack.io</id>
		    <url>https://jitpack.io</url>
		</repository>
	</repositories>

        <dependency>
	    <groupId>com.github.ReactiveX</groupId>
	    <artifactId>RxKotlin</artifactId>
	    <version>1.x-SNAPSHOT</version>
	</dependency>

Learn more about building this project with JitPack here.

Extensions

Target Type Method Return Type Description
BooleanArray toObservable() Observable Turns a Boolean array into an Observable
ByteArray toObservable() Observable Turns a Byte array into an Observable
ShortArray toObservable() Observable Turns a Short array into an Observable
IntArray toObservable() Observable Turns an Int array into an Observable
LongArray toObservable() Observable Turns a Long array into an Observable
FloatArray toObservable() Observable Turns an Float array into an Observable
DoubleArray toObservable() Observable Turns an Double array into an Observable
Array toObservable() Observable Turns a T array into an Observable
IntProgression toObservable() Observable Turns an IntProgression into an Observable
Iterable toObservable() Observable Turns an Iterable<T> into an Observable
Iterator toObservable() Observable Turns an Iterator<T> into an Observable
Observable flatMapSequence() Observable Flat maps each T emission to a Sequenece<R>
Observable<Pair<A,B>> toMap() Collects Pair<A,B> emissions into a Map<A,B>
Observable<Pair<A,B>> toMultimap() Collects Pair<A,B> emissions into a Map<A,List<B>>
Observable<Observable> mergeAll() Observable Merges all Observables emitted from an Observable
Observable<Observable> concatAll() Observable Cocnatenates all Observables emitted from an Observable
Observable<Observable> switchLatest() Observable Emits from the last emitted Observable
Observable<*> cast() Observable Casts all emissions to the reified type
Observable<*> ofType() Observable Filters all emissions to only the reified type
Iterable<Observable> merge() Merges an Iterable of Observables into a single Observable
Iterable<Observable> mergeDelayError() Merges an Iterable of Observables into a single Observable, but delays any error
BooleanArray toFlowable() Flowable Turns a Boolean array into an Flowable
ByteArray toFlowable() Flowable Turns a Byte array into an Flowable
ShortArray toFlowable() Flowable Turns a Short array into an Flowable
IntArray toFlowable() Flowable Turns an Int array into an Flowable
LongArray toFlowable() Flowable Turns a Long array into an Flowable
FloatArray toFlowable() Flowable Turns an Float array into an Flowable
DoubleArray toFlowable() Flowable Turns an Double array into an Flowable
Array toFlowable() Flowable Turns a T array into an Flowable
IntProgression toFlowable() Flowable Turns an IntProgression into an Flowable
Iterable toFlowable() Flowable Turns an Iterable<T> into an Flowable
Iterator toFlowable() Flowable Turns an Iterator<T> into an Flowable
Flowable flatMapSequence() Flowable Flat maps each T emission to a Sequenece<R>
Flowable<Pair<A,B>> toMap() Collects Pair<A,B> emissions into a Map<A,B>
Flowable<Pair<A,B>> toMultimap() Collects Pair<A,B> emissions into a Map<A,List<B>>
Flowable<Flowable> mergeAll() Flowable Merges all Flowables emitted from an Flowable
Flowable<Flowable> concatAll() Flowable Cocnatenates all Flowables emitted from an Flowable
Flowable<Flowable> switchLatest() Flowable Emits from the last emitted Flowable
Flowable cast() Flowable Casts all emissions to the reified type
Flowable ofType() Flowable Filters all emissions to only the reified type
Iterable<Flowable> merge() Merges an Iterable of Flowables into a single Flowable
Iterable<Flowable> mergeDelayError() Merges an Iterable of Flowables into a single Flowable, but delays any error
T toSingle() Single Turns any T item into a Single<T>
Future toSingle() Single Turns a Future<T> into a Single<T>
Callable toSingle() Single Turns a Callable<T> into a Single<T>
() -> T toSingle() Single Turns a () -> T into a Single<T>
Single cast() Single Casts all emissions to the reified type
Observable<Single> mergeAllSingles() Observable Merges all Singles emitted from an Observable
Flowable<Single> mergeAllSingles() Flowable Merges all Singles emitted from a Flowable
T?.toMaybe() toMaybe() Maybe Turns a nullable T value into a Maybe<T> that will only emit if not null
Future toMaybe() Maybe Turns a Future<T> into a Maybe<T>
Callable toMaybe() Maybe Turns a Callable<T> into a Maybe<T>
() -> T toMaybe() Maybe Turns a () -> T into a Maybe<T>
Maybe cast() Maybe Casts any emissions to the reified type
Maybe ofType() Maybe Filters any emission that is the reified type
Observable<Maybe> mergeAllMaybes() Observable Merges all emitted Maybes
Flowable<Maybe> mergeAllMaybes() Flowable Merges all emitted Maybes
Action toCompletable() Completable Turns an Action into a Completable
Callable toCompletable() Completable Turns a Callable into a Completable
Future toCompletable() Completable Turns a Future into a Completable
(() -> Any) toCompletable() Completable Turns a (() -> Any) into a Completable
Observable mergeAllCompletables() Completable> Merges all emitted Completables
Flowable mergeAllCompletables() Completable Merges all emitted Completables
Observable subscribeBy() Disposable Allows named arguments to construct an Observer
Flowable subscribeBy() Disposable Allows named arguments to construct a Subscriber
Single subscribeBy() Disposable Allows named arguments to construct a SingleObserver
Maybe subscribeBy() Disposable Allows named arguments to construct a MaybeObserver
Completable subscribeBy() Disposable Allows named arguments to construct a CompletableObserver
Observable blockingSubscribeBy() Unit Allows named arguments to construct a blocking Observer
Flowable blockingSubscribeBy() Unit Allows named arguments to construct a blocking Subscriber
Disposable addTo() Disposable Adds a Disposable to the specified CompositeDisposable
CompositeDisposable plusAssign() Disposable Operator function to add a Disposable to thisCompositeDisposable

SAM Helpers

To help cope with the SAM ambiguity issue when using RxJava 2.x with Kotlin, there are a number of helper factories and extension functions to workaround the affected operators.

Observables.zip()
Observables.combineLatest()
Observable#zipWith()
Observable#withLatestFrom()
Flowables.zip()
Flowables.combineLatest()
Flowable#zipWith()
Flowable#withLatestFrom()
Singles.zip()
Single#zipWith()
Maybes.zip()

Usage with Other Rx Libraries

RxKotlin can be used in conjunction with other Rx and Kotlin libraries, such as RxAndroid, RxBinding, and TornadoFX/RxKotlinFX. These libraries and RxKotlin are modular, and RxKotlin is merely a set of extension functions to RxJava that can be used with these other libraries. There should be no overlap or dependency issues.

Contributing

We welcome contributions and discussion for new features. It is recommended to file an issue first to prevent unnecessary efforts, but feel free to put in pull requests. The vision is to keep this library lightweight, with a tight and focused scope applicable to all platforms (including Android, server, and desktop). Anything specific to a particular domain (for example, JavaFX or JDBC), might be better suited as a separate project. Feel free to open discussion and we will help figure out where your functionality may belong.