/gulp

A functional and configurable way of creating composable pipelines of functions. Kinda like Plug, but backwards.

Primary LanguageElixir

DO NOT USE!

GULP—Kinda Like Plug, But Backwards…

Gulp lets you construct pipelines of actions in a composable, dynamic, configurable, and functional way.

There are three types of action:

  • a source is a stream of values. It can be bounded or unbounded.
  • a transform is a function that takes a value and produces either another value or an error
  • a sink is the end of a pipeline: it receives data from a pipeline, and then disposes of it into the rest of the system.

In a logging system, the sources could be things such as an interface to the system error log, and API for programs to generate log messages, and maybe a statsd server for receiving statistics from other sources.

The transforms might be responsible for filtering messages based on content or level, extracting summary information, and so on.

The sinks could format messages for appropriate devices, writing messages to databases, plain text logfiles, and SaaS services.

You'd configure these together by creating a Gulp pipeline:

TODO: logger gulp pipeline

In addition, all the actions in a system may have associated configuration information. This information is supplied to them when they first start, and they can transform it into a form that's most usable by them. This transformed configuration is automatically supplied to them on each request.

Sources

A source is an Elixir stream. You can use the built-in streams, or implement your own modules using Stream.xxx functions.

The values returned by a source stream must be packaged into an { :ok, value } tuple.

Here's an example source that reads lines from the console and injects them into a pipeline.

TODO: source example

Composing Sources

You can inject values from multiple sources into a single pipeline by composing those sources into a source bundle. Here's a bundle that includes the console-reading source and a timer.

TODO: source bundle example

A bundled set of sources acts as a single source to the rest of the pipeline: as values are generated by each of the constituent sources they will be emited to the pipeline.

Transforms

A transform is a function that receives a value and a configuration, and returns an { :ok, value } tuple or an { :error, reason } tuple.

A transform can be written as a module that implements the transform behaviour, or as an anonymous function that takes a value and a configuration.

Here are some transforms:

TODO: transform examples

Transforms may be composed into sequential and parallel bundles.

Sequential Transform Bundle

You create a sequential transform bundle using

twice_plus_one = Gulp.sequential [
  ok(&1 * 2),
  ok(&1 + 1
]

You'd invoke this with

twice_plus_one(3)   # -> { :ok, 7 }

How does this work?

Each transform in the list passed to Gulp.sequential is converted into a function with two arguments. Inside the body of the action, these arguments are represented as &value and &config.

The function is expected to return { :ok, value } or { :error, reason }. If any function returns the error tuple, the rest of the sequence is abandoned. Otherwise the value part of the :ok tuple is passed as an argument to the next function in the list

IN a sequential bundle, the value returned by a function becomes the value passed as &value to the next function.

The ok/1 function does nothing magical: it simply converts its argumement into the tuple { :ok, arg }.

Parallel Transform Bundle

Transforms can be run in parallel:

get_social_status = Gulp.parallel [
  twitter   <- get_twitter_status(&value, &config),
  facebook  <- get_fb_status(&1, &config),
  instagram <- get_instagram_status(&1, &config.level),
  do: ok(%MyStatus{twitter, facebook, instagram})
]

get_social_status("pragdave")

In this example, the three get_xxx calls are invoked in parallel. The &value in each call is replaced by the parameter passed when you invoke the actions (so each transform will receive "pragdave").

The three get_xxx functions return ok or error tuples. If any returns an error, any transforms still running are terminated early, and that error becomes the value returned by the overall bundle.

Sinks

A sink is the termination of a pipeline. It receives a value and a configuration, and does what it wants with it. Any value it returns is ignored. You'll probably write sinks as modules that implement Gulp.Sink.

Here's a sink that writes values to a file. Note that it opens the file during initialization, and saves the file PID in its configuration. When it receives values, it then recovers that PID and writes the value to the file.

TODO: sink example

Sink Bundles

Multiple sinks can be placed into a bundle. To the rest of the pipeline, they appear to be a single sink. Internally, each individual sink receives any values passed in to the bundle. Each sink is run in parallel, and the bundle as a whole accepts values are the rate determined by its slowest member. If you have a slow-running sink, you should probably decouple the slow part into a separate process and implement a policy which either discards messages or buffers them. (Alternatively, if you have a slow-running sink, call a plumber.)

Here's a sink bundle that might be used in a logger:

TODO: sink bundle

Higher Level Composition

Source, transform, and sink bundles can themselves be composed into larger source, transform, or sink bundles. For example:

...

inside action, &value, &config

Gulp.source(fn -> return_value end, options) Gulp.source_bundle(name: ..., next: ..., [ Module, name=module { Module, options }, fn, no name, no config { fn, options } ])

every action has config(config) -> updated config

config includes

all: name

source: target

transform: target

sink: ?