Simple but flexible python pipelines based on function composition.
Build your workflow step by step, then flexibly combine the steps to make a bigger step. Conditional execution and branching are also supported.
- Author: Bob Kerns
- License: MIT
- Bugs> https://github.com/BobKerns/fpipeline/issues
- Examples
Organizing a data application as a pipeline brings many benefits:
- By naming each top-level operation, a degree of natural self-documentation is provided.
- Each step can easily be independently tested.
- The pipeline focuses on the overall sequence of operation and dataflow.
- A clear boundary between levels of abstraction. Details live in steps and conditions.
The pipeline orchestrates these pieces, while remaining entirely agnostic about what the operations do.
A step is a function of one argument, which we will call the context. The context can be any object, including a dict, according to the needs of the application.
This single argument will be the same for all steps in an application, but different for each invocation.
Often, it will be an object representing the data the
pipeline is to operate on. It can also be contained in
an object or dict
along with various metadata.
A Step
type hint is defined to be:
Step = Callable[[D],V]
Where D
and V
are type variables,
Steps taking only a single argument would seem very limiting. But we have a solution!
To define a step function, we use the decorator @stepfn
. The function's first positional argument is interpreted as the context
argument. The function is replaced with a new function
that accepts all the other arguments, and returns a
new function that accepts the context argument (thus,
a Step
), and invokes the original
function with all its arguments.
We transform this:
@stepfn
def my_step(ctx; CTX, a; A, b: B, c: C) -> V:
into:
def my_step(a: A, b: B, c: C) -> Step[CTX,V]:
or more informally
my_step(A, B, C) -> (CTX) -> V
That is, we supply everything except the first argument, then apply the context parameter for each data value processed by the pipeline.
It might seem that this limits us to constant values. However, the use of pipeline variables allow different values to be injected at each execution. Pipelne variables are evaluated at each execution.
Using a simple protocol based on single-argument functions allows us to use them as building blocks, to combine them into entire pipelines, and to combine pipelines into larger pipelines, all following the same protocol.
To allow passing values between pipeline Step
s in a flexible way, we provide two forms of pipeline variables, that allow capture of the return value of a Step
, and then supply it as an argument to a later step
function, all handled by the behind the scenes.
Pipeline variables will hold any value.
A pipeline variable is also callable as a Step
, allowing them to be used in a
pipeline to provide a return value for the pipeline.
variables
returns a VariableContext
, which is a python context manager. This is used in a with ... as ...
statement.
The resulting VariableContext
gives out pipeline variables and manages their scope.
A context manager that manages pipeline variables.
Usage goes like this:
# Dummy StepFns to illustrate
@stepfn
def readAssetStep(data: Data, path: str) -> Asset:
return path
@stepfn
def mergeAssetsStep(data: Data, asset1: Asset, asset2: Asset) -> Asset:
return f"{asset1}+{asset2}"
@stepfn
def writeAssetStep(data: Data, asset: Asset, path: str) -> Asset:
print(f"{path}: ${asset}")
# a `StepFn` (a pipeline is a `Step`) that takes two src paths to assets,
# merges them, stores the result in data.result, and writes it to a file.
# The asset paths are provided per-invocation in the context
# The output directory is configured as an argument
# when creating the pipeline.
@stepfn
def merge(data: Data, outdir: str) -> Asset:
with variables() as vars:
# declare the pipeline variables that we need
src1, src2 = vars.attribute('src1', 'src2')
asset1 = vars.variable('asset1', 'asset2')
result = vars.variable('result') # stors in data.result
return vars.pipeline(
store(asset2, readAssetStep(src2)),
store(asset1, readAssetStep(src1),
store(result, mergeAssetsStep(asset1, asset2),
writeAssetStep(result, outdir),
result
)(data)
merge
can now be invoked by omitting the data argument, giving a function of one
argument (data).
pair1 = {
'asset1': '/data/assets/src1',
'asset2': '/data/assets/src2'
}
merge_and_store = merge(outdir='/data/assets/merged')
# Perform the operation
merged = merge_and_store(pair1)
Our new Step
(merge_and_store
) can then be calld for
each merger to be performed.
If we have two directories of files to be merged, this will take them pairwise and feed each pair through the pipeline.
def get_assets(asset1, asset2):
list1 = glob.glob(asset1)
list2 = glob.glob(asset2)
paired = zip(list1, list2)
return ({'asset1': a1, 'asset2': a2}
for (a1, a2) in paired)
left = '/data/assets1/*.asset'
right = '/data/assets2/*.asset'
results = list(map(merge_and_store, get_assets(left, right)))
Returns a Variable
, or a tuple of Variable
s if more than one name is given.
This allows assignment of multiple variables in a single statement:
a, b = vars.variable('a', 'b')
Returns a type of pipeline variable called Attribute](#class-attribute), or a tuple of [
Attribute`s if
more than one name is given.
This allows assignment of multiple attribute variables in a single statement:
a, b = vars.attribute('a', 'b')
Creates and runs pipeline in this context.
Represents a place to store and retrieve values between steps.
The value of a Variable
. Not usually referenced directly; rather the variable is passed to step functions, or assigned with the store
step function.
The name of the variable. It must be unique within a VariableContext
. Multiple uses of the same name will yield the same variable.
A pipeline variable that access the context. The name names the field or key to access.
The value of a Attribute
. Not usually referenced directly; rather the variable is passed to step functions, or assigned with the store
step function.
The name of the variable. It must be unique within a VariableContext
. Multiple uses of the same name will yield the same variable.
It is also the name of the field or key in the context.
Store the result of step into the variable.
[Advanced]
If var is any type of pipeline variable, its value is returned.
If var is a container type (list
, tuple
, namedtuple
, dict
, or set
),
a copy is returned with the variables replaced with their values.
This is performed to depth levels.
In most cases, this will be called for you at appropriate points.
A pipeline that executes every step on every input would severely limit flexibility.
fpipeline
provides for branching, allowing steps to be skipped where
they don't apply, or entire different flows be selected.
The primary means is via the if_
step function.
These functions have a '_' suffix to avoid conflicts while maintaining readability. They are not in any sense private; they ae a fully public part of the interface.
cond is a Condition
, which is like a Step
except
the return value is a bool
. It should be defined using the
@conditionfn
decorator in the same way as
@stepfn
is used for step functions.
then and else are steps (or pipelines), executed according to the value of cond. They may be omitted or supplied as None.
If then or else are lists, they will be treated implicitly as a pipeline.
not_
returns a new Condition
with the opposite sense.
and_
returns a new Condition
that returns
False
if any of its arguments return False
,
and True
otherwise.
or_
returns a new Condition
that returns
True
if any of its arguements return True
,
and False
otherwise.
Create a pipeline—a step that executes steps in order, and returns the value returned by the last.
Calls an an arbitrary function on the context, plus any additional arguments supplied. Variables and steps will be replaced by their values.
Return a list of values. Steps and variables will be evaluated.
Return a tuple of values. Steps and variables will be evalaued.
Return a dict from the supplied keyword arguments. Steps and variables will be evaluated.
Return a set from the supplied keyword arguments. Steps and variables will be evaluated.
Pull requests are welcome, as are [bug reports, feature requests, documentation requests or fixes)[https://github.com/BobKerns/fpipeline/issues]
Pull requests do not need to be rebased or linear, but you should at least pull from the latest
main
branch and resolve conflicts.
You should pass all test in the test notebook, and add any new ones to the test notebook.