Gulp lets you construct pipelines of actions in a composable, dynamic, configurable, and functional way.
There are three types of action:
- a source is a stream of values. It can be bounded or unbounded.
- a transform is a function that takes a value and produces either another value or an error
- a sink is the end of a pipeline: it receives data from a pipeline, and then disposes of it into the rest of the system.
In a logging system, the sources could be things such as an interface to the system error log, and API for programs to generate log messages, and maybe a statsd server for receiving statistics from other sources.
The transforms might be responsible for filtering messages based on content or level, extracting summary information, and so on.
The sinks could format messages for appropriate devices, writing messages to databases, plain text logfiles, and SaaS services.
You'd configure these together by creating a Gulp pipeline:
In addition, all the actions in a system may have associated configuration information. This information is supplied to them when they first start, and they can transform it into a form that's most usable by them. This transformed configuration is automatically supplied to them on each request.
A source is an Elixir stream. You can use the built-in streams, or
implement your own modules using Stream.xxx
functions.
The values returned by a source stream must be packaged into an
{ :ok, value }
tuple.
Here's an example source that reads lines from the console and injects them into a pipeline.
You can inject values from multiple sources into a single pipeline by composing those sources into a source bundle. Here's a bundle that includes the console-reading source and a timer.
A bundled set of sources acts as a single source to the rest of the pipeline: as values are generated by each of the constituent sources they will be emited to the pipeline.
A transform is a function that receives a value and a configuration, and
returns an { :ok, value }
tuple or an { :error, reason }
tuple.
A transform can be written as a module that implements the transform
behaviour, or as an anonymous function that takes a value and a configuration.
Here are some transforms:
Transforms may be composed into sequential and parallel bundles.
You create a sequential transform bundle using
twice_plus_one = Gulp.sequential [
ok(&1 * 2),
ok(&1 + 1
]
You'd invoke this with
twice_plus_one(3) # -> { :ok, 7 }
How does this work?
Each transform in the list passed to Gulp.sequential
is converted into
a function with two arguments. Inside the body of the action, these
arguments are represented as &value
and &config
.
The function is expected to return { :ok, value }
or
{ :error, reason }
. If any function returns the error tuple, the rest
of the sequence is abandoned. Otherwise the value part of the :ok tuple
is passed as an argument to the next function in the list
IN a sequential bundle, the value returned by a function becomes the
value passed as &value
to the next function.
The ok/1
function does nothing magical: it simply converts its
argumement into the tuple { :ok, arg }
.
Transforms can be run in parallel:
get_social_status = Gulp.parallel [
twitter <- get_twitter_status(&value, &config),
facebook <- get_fb_status(&1, &config),
instagram <- get_instagram_status(&1, &config.level),
do: ok(%MyStatus{twitter, facebook, instagram})
]
get_social_status("pragdave")
In this example, the three get_xxx
calls are invoked in parallel. The
&value
in each call is replaced by the parameter passed when you
invoke the actions (so each transform will receive "pragdave"
).
The three get_xxx
functions return ok or error tuples. If any returns
an error, any transforms still running are terminated early, and that
error becomes the value returned by the overall bundle.
A sink is the termination of a pipeline. It receives a value and a
configuration, and does what it wants with it. Any value it returns is
ignored. You'll probably write sinks as modules that implement Gulp.Sink
.
Here's a sink that writes values to a file. Note that it opens the file during initialization, and saves the file PID in its configuration. When it receives values, it then recovers that PID and writes the value to the file.
Multiple sinks can be placed into a bundle. To the rest of the pipeline, they appear to be a single sink. Internally, each individual sink receives any values passed in to the bundle. Each sink is run in parallel, and the bundle as a whole accepts values are the rate determined by its slowest member. If you have a slow-running sink, you should probably decouple the slow part into a separate process and implement a policy which either discards messages or buffers them. (Alternatively, if you have a slow-running sink, call a plumber.)
Here's a sink bundle that might be used in a logger:
Source, transform, and sink bundles can themselves be composed into larger source, transform, or sink bundles. For example:
...
inside action, &value, &config
Gulp.source(fn -> return_value end, options) Gulp.source_bundle(name: ..., next: ..., [ Module, name=module { Module, options }, fn, no name, no config { fn, options } ])
every action has config(config) -> updated config
config includes
all: name
source: target
transform: target
sink: ?