/reactive-plumber

Groovy DSL for declarative Reactor or RxJava streaming graphs

Primary LanguageJavaApache License 2.0Apache-2.0

Reactive Plumber

build Join the chat at https://gitter.im/reactive-plumber/Lobby

A swisspush project

You want to use Reactor (or RxJava) within a modular, readable and safe abstraction.

Reactive-Plumber let you write your reactive stream plumbing in a Groovy DSL and also visualize it graphically.

It is intended to be used in Java or Groovy applications.

def data = pipe {
    from input map wrap
}

def printer = {
    from it doOnNext print
}

def renderer = pipe {
    parallel from(data) \
    map renderThread
}

def count = pipe {
    from data count()
}

def size = pipe {
    from data \
    zipWith value(count), attach \
    map renderSize \
    compose printer
}

def thread = pipe {
    from renderer compose printer
}

drain size, thread

Built using these outstanding projects:

The DSL

Have a look at the examples scripts.

Pipes

The idea is to write the pipelines in Groovy, using parenthese-less method chaining.

So a simple definition of a pipe could be:

def items = pipe {
    just 1, 2, 3 \
    map Integer.&bitCount \
    map String.&valueOf
}

This defines a pipe, which is actually a Reactor Flux (or RxJava Flowable). In the closure block, we simply chain existing stream methods. Here we count the bits in each numbers and convert them into strings.

From

To connect pipes together, use from:

def printer = pipe {
    from items \
    to { x -> print x }
}

Drain

It is not sufficient to connect pipes together in order to define your plumbing. You also need to declare on which pipes are terminal to ensure that all events are processed.

drain printer

Tools

You will expose your own functions to be used in the plumbing. Do that in a sub-class of Plumbing, so that when you import statically your functions, you will also get the Reactive-Plumber builtin functions.

Tools.groovy:

import Plumbing

abstract class Tools extends Plumbing {
    static input = Domain.&input
    static print = Domain.&print
    static renderThread = Domain.&renderThread
    static renderSize = Domain.&renderSize
}

script.groovy:

import static Tools.*;

Pipe Functions

The builtin pipe functions are

Function Description
pipe Declares a pipe.
from Connects this pipe's input to a previously declared pipe.
drain Declare a pipe as terminal.
split Returns an array of pipes filtered with a predicate array.
concurrent Processes this pipe concurrently to other pipes.
parallel Processes the events in this pipe in parallel.
value Provide the result of a pipe in form suitable to be combine with other pipe items, usually via zipWith.

You can also dynamically create pipes. This uses GroupedFlowable under the hood:

Function Description
groups Return a pipe of pipes grouped by a grouping function.
key Extracts the key of a grouped pipe.
each Declares a pipe to apply to each grouped pipe outputed by groups

Box Functions

This library provides a monadic wrapper type Box that allows to transport context along with values throughout pipes. If you use it in your pipes, you may want to import the following functions to your Tools class.

Function Description
wrap Creates a box around a value.
unwrap Removes the box and returns the value.
attach Add a context to a box.
mapper Applies a function to a value inside a box.
bind Flat-maps the box, apply a function on the value returning a box.
context Add a context to the box using the grouping key of grouped pipes.

Examples

Each example has a pipeline Groovy script and an adapter Java class that exposes the business logic methods in a simple static functional form.

One

one.groovy

One.java

output:

[1 [RxComputationThreadPool-1]|]
[1 / 4|]
[2 [RxComputationThreadPool-2]|]
[2 / 4|]
[3 [RxComputationThreadPool-3]|]
[4 [RxComputationThreadPool-4]|]
[3 / 4|]
[4 / 4|]














.

Two

two.groovy

Two.java

output:

2 EVEN
3 ODD













.

Three

three.groovy

Three.java

output:

1
2
FIZZ
4
BUZZ
FIZZ
7
8
FIZZ
BUZZ
11
FIZZ
13
14
FIZZBUZZ
FIZZ: 4
BUZZ: 2
FIZZBUZZ: 1



.

Five

five.groovy

Five.java

output:

strings, [hello|], [world|]
numbers, [5|], [3|]






.

Graph Visualization

Reactive-Plumber can create graph images of your plumbing like the ones above by analyzing the Groovy AST of the script.

Runtime runtime = new Runtime().withGraphTheme(Runtime.GraphTheme.LIGHT); // White background
runtime.generateGraph("src/main/resources/examples/one/one.groovy", new File("target/graph.png"));
runtime.generateGraph("src/main/resources/examples/one/one.groovy", "svg", System.out));