Reactive Programming library based on the Java 9 Flow API and a 4th generation ReactiveX-style architecture.
compile "com.github.akarnokd:reactive4javaflow:0.1.5"
The base package of Reactive4JavaFlow is hu.akarnokd.reactive4javaflow
where the main reactive classes are located.
Note that the library requires a JDK 9 compatible runtime, thus it is not compatible with most of what is
currently available on Android.
The library doesn't declare modules (Project Jigsaw) and should be usable from both classpath and modularized environments.
The Folyam
base reactive type is the 0 .. N element, backpressure aware, Java 9 Flow.Publisher implementation.
The word means flow or stream in hungarian and was chosen to avoid name clashing with existing libraries and types (such
as Flowable
, Flux
, Stream
and unfortunately Flow
too). The name is also one of the few words describing the
target concept that doesn't use accented letters.
Folyam
features most of the typical ReactiveX operators commonly
found in RxJava and Reactor and couple of the less common extension operators from
RxJava 2 Extensions.
import hu.akarnokd.reactive4javaflow.*;
Folyam.just("Hello world!")
.subscribe(System.out::println);
Operators that work with multiple sources and may need to delay errors now have separate methods with DelayError
postfix: concatDelayError
, flatMapDelayError
.
The Esetleg
base reactive type is the 0 .. 1 element, backpressure aware, Java 9 FLow.Publisher implementation.
It resembles the RxJava 2 Maybe
type and is similar to Reactor's Mono
type. The word itself means maybe or
perhaps.
AutoDisposable d = Esetleg.fromCallable(() -> "Hello world")
.subscribeWith(new AbstractFolyamSubscriber<>() {
@Override public void onNext(String s) {
System.out.print(s);
}
@Override public void onError(Throwable ex) {
ex.printStackTrace();
}
@Override public void onComplete() {
System,out.println("!");
}
});
d.close();
The resource manager type is the AutoDisposable
interface, which extends from java.util.AutoCloseable
and thus can
be used in try-with-resources. Unlike RxJava 2 Disposable
, the interface doesn't offer any means to check if
the resource is disposed.
The AbstractFolyamSubscriber
shown in the previous section's example also implements AutoDisposable
and together
with hu.akarnokd.reactive4javaflow.disposables.CompositeAutoDisposable
can be used to track and mass-close resources.
The disposables.BooleanAutoDisposable
can help in testing and SequentialAutoDisposable
may help with
custom operators or individual resource tracking.
Just like RxJava 2 and Reactor 3, the library offers parallel operations through the Folyam.parallel()
method. The
ParallelFolyam
features the same operators as RxJava 2 does plus several of the sequential and less common operators
such as the asynchronous mapWhen()
.
Folyam.range(1, 100)
.parallel()
.runOn(SchedulerServices.computation())
.sumInt()
.test()
.assertResult(5050);
The library offers the standard set of global schedulers in SchedulerServices
(named to avoid too much conflict with
RxJava and Reactor): computation
, io
, newThread
, single
, trampoline
and newExecutor
. Testing can be done with the
TestSchedulerService
class.
In addition, the SchedulerServices
allows creating custom schedulers, parameterized by name, priority and daemon-ness
via newParallel
, newIO
, newThread
and newSingle
. The utility class also supports creating blocking schedulers via
newBlocking
and sharing an existing worker via newShared
.
The SchedulerService
interface implemented by the scheduler offers the ability to schedule tasks of
Runnable
directly or on a Worker
; immediately, delayed or periodically. The Worker
interface
implements AutoDisposable
and all of the pending tasks can be cancelled via Worker.close()
.
The current API design in Folyam
and Esetleg
doesn't offer operators with default schedulers; the
SchedulerService
to be used must be specified as parameter to these operators.
Folyam.intervalRange(1, 5, 100, 100, TimeUnit.MILLISECONDS, SchedulerServices.single())
.blockingSubscribe(System.out::println);
Unfortunately, Java's default functional interfaces don't support throwing checked exceptions, therefore a set of new interfaces were added and used throughout the API:
import hu.akarnokd.reactive4javaflow.functionals.*;
CheckedFunction<Integer, Integer> f = v -> { throw new IOException(); };
Folyam.just(1).map(f).test().assertFailure(IOException.class);
The library features almost all standard "subject" types from RxJava in the processors
subpackage:
DirectProcessor
: emit items directly to multiple subscribers, signals error if a particular subscriber is not ready to receive items (similar to RxJava'sPublishProcessor
),MulticastProcessor
: coordinates the backpressure between subscribers (no standard RxJava equivalent),FirstProcessor
: takes and caches the first item or terminal signal and exposes it as anEsetleg
reactive type (similar toMaybeSubject
),LastProcessor
: waits for and caches the last item or terminal signal and exposes it as aFolyam
(similar toAsyncProcessor
),CachingProcessor
: caches items (unbounded, size and/or time bound) and replays them to current and future subscribers (similar toReplayProcessor
),SolocastProcessor
: buffers items until a single subscriber is able to consume them (similar toUnicastProcessor
).
The BehaviorProcessor
is currently not replicated in this library and can be emulated via new CachingProcessor<>(1)
.
The cold-to-hot conversion is available via the Folyam.publish()
and Folyam.replay()
operators which return a
ConnectableFolyam
instance.
Unlike RxJava and Reactor, the ConnectableFolyam
has 3 states: fresh, running, terminated. In order to get back
to the fresh state, one has to call ConnectableFolyam.reset()
in the terminated state.
The reason for this design difference is that the RxJava behavior often causes trouble in preparing subscribers
after the first round has completed, which can lead to data loss in case of publish()
.
ConnectableFolyam<Integer> cf = Folyam.range(1, 5).publish();
cf.subscribe(System.out::println);
cf.subscribe(System.out::println);
cf.connect();
// the source has run to completion and
// publish will complete any latecommers
cf.test().assertResult();
// go back to the fresh state
cf.reset();
// prepare the next set of subscribers without
// the source to rush ahead
cf.subscribe(System.out::println);
cf.subscribe(System.out::println);
cf.connect();
A more prominent effect is when replay
or replayLast()
is used. Once the source completed, all or
the last items are still available to late subscribers until reset
is called.
The usual autoConnect()
and refCount
operators are supported along with refCount
overloads that
allows specifying the minimum subscriber count and/or a grace period before closing the connection after
the last subscriber has cancelled.
Testing the Folyam
and Esetleg
types can be done via the convenient .test()
method which uses
a TestConsumer
class behind the scenes (similar to TestSubscriber
).
Scheduler-dependent operators can be tested with the help of the TestSchedulerService
and its advanceTimeBy
method
as the mean to move time forward.
TestSchedulerService sch = new TestSchedulerService();
TestConsumer<Integer> tc = Folyam.just(1).delay(5, TimeUnit.MILLISECONDS, sch).test();
tc.assertEmpty();
tc.advanceTimeBy(5, TimeUnit.MILLISECONDS);
tc.assertResult(1);
The FolyamPlugins
offers the ability to hook the assembling of operators (onAssembly
), when they get
subscribed onSubscribe
and when there is an undeliverable exception onError
.
The FolyamPluins
also allows overriding the initial and current schedulers returned by SchedulerServices
.
Note that accessing SchedulerServices
- for example, to create a custom scheduler via one of the newXXX
methods
may prematurely initialize the other schedulers. Therefore, it is recommended to hook the
current schedulers only (setOnComputationSchedulerService
).
The Reactive4JavaFlow library is a 4th generation reactive solution that supports the operator fusion concept
more extensively than RxJava 2 does at the moment. Since there is no standard, multi-library API for operator fusion anyway, this
library defines the core interfaces in the hu.akarnokd.reactive4javaflow.fused
package:
ConditionalSubscriber
: with itstryOnNext
method, such subscriber can avoidrequest(1)
calls and keep draining longer without costly atomic decrementsFusedQueue
: base interface for queues with minimal number of operations and apoll()
method that can throw aThrowable
FusedSubscription
: the combination ofFusedQueue
and the Java 9Flow.Subscription
interfaces to establish and run fused queuesFusedDynamicSource
: represents a source that can produce an item, null or throw an exception at the runtime of the flowFusedStaticSource
: special variant of the dynamic source that can hosts a constant (or is empty) and allows assembly time optimizations.