Drain integrates Python with drake, resulting in a robust and efficient Python workflow tool. Drain additionally provides a library of pipeline steps for processing data, training models, making predictions, and analyzing results.
Drake is a simple-to-use, extensible, text-based data workflow tool that organizes command execution around data and its dependencies. Data processing steps are defined along with their inputs and outputs and Drake automatically resolves their dependencies and calculates:
- which commands to execute (based on file timestamps)
- in what order to execute the commands (based on dependencies)
Drain builds on drake and adds:
-
Steps need not produce file outputs. This way, you can define a DAG of steps, and decide later which parts of it should be cached, and which steps should always re-run. This allows the user to think of steps as natural units of work (instead of as file-producing units of work). For example, you might have a step that performs a costly imputation (and which you thus would like to cache), and another step that drops some columns from your dataframe (and which you do not want to cache, as that would be a waste of disk space). In
drain
, these two steps follow the same template, and caching is enabled by passing a flag todrain
. -
Management of filepaths behind the scenes. The user does not define dependencies between files, but rather dependencies between steps. This is extremely useful when your pipeline has to handle a large and variable amount of files, e.g. when you iterate over many different models or feature subsets.
-
Finally,
drain
keeps track not only of file timestamps, but also of each step's parameters (it's 'signature'). If you change some setting in one of your data processing steps, thendrain
will run all those follow-up steps that are affected by this change.
This is a toy example, in which each Step
produces a number.
-
We define a simple
Step
that wraps a numeric value:class Scalar(Step): def __init__(self, value, **kwargs): # note how we do not need to say self.value=value; the parent constructor does that for us Step.__init__(self, value=value, **kwargs) def run(self): return self.value
-
> s = Scalar(value=5)
Note that the result of a step's
run()
method is accessible viaget_result()
. -
Steps can use the results of others steps, called
inputs
. For example we can define anAdd
step which adds the values of its inputs:class Add(Step): def __init__(self, inputs): Step.__init__(self, inputs=inputs) def run(self, *values) return sum((i.get_result() for i in self.inputs))
In order to avoid calling
get_result()
, drain does so-called inputs mapping which is explained in the corresponding section below. In its most basic form, inputs mapping allows us to rewriteAdd.run
as follows:def run(self, *values): return sum(values)
a = Add(inputs = [Scalar(value=v) for v in range(1,10)])
drain
is a pretty lightweight wrapper around drake
; its core functionality
are only a few hundred lines of code.
A workflow consists of steps, each of which is inherited from the drain.step.Step class. Each step must implement the run()
method, whose return value is the result
of the step. A step should be a deterministic function from its constructor arguments to its result.
Because a step is only a function of its arguments, serialization and hashing is easy. We use YAML for serialization, and hash the YAML for hashing. Thus all arguments to a step's constructor should be YAML serializable.
TODO: Explanations
Step
's constructor accepts any keyword argument, but does not accept positional arguments.- A
Step
can decide to only accept certain keyword arguments by defining a custom__init__()
. - Reserved keyword arguments are
name
,target
,inputs
,inputs_mapping
, andresources
. These are handled specifically byStep.__new__()
. - When passing keyword arguments to a
Step
constructor, then all the arguments (exceptname
andtarget
) become part of the signature (i.e., they will be part of thisStep
's serialization). Any instance of aStep
automatically has an attribute_kwargs
holding these arguments. - When a
Step
does not override__init__()
(i.e., when it uses the defaultStep.__init__()
), then all the keyword arguments that are being passed become attributes of the new instance. This is a mere convenience functionality. It can be overriden simply by overriding__init__()
, and it does not affect serialization.
Each Step
has several reserved keyword arguments, namely target
, name,
inputs_mapping,
resources, and
inputs`.
name
defaults to None and target
to False
. name
is a string and allows you to name your current Step
; this is useful later, when handling the step graph. target
decides if the Step
's output should be cached on disk or not. These two arguments are not serialized.
The step attribute inputs
should be a list of input step objects. Steps appearing in other arguments will not be run correctly. Note that the Step.__init__
superconstructor automatically assigns all keywords to object attributes.
Inputs can also be declared within a step's constructor by setting the inputs
attribute.
The inputs_mapping
argument to a step allows for convenience and flexibility in passing that step's inputs' results to the step's run()
method.
By default, results are passed as positional arguments. So a step with inputs=[a, b]
will have run
called as
run(a.get_result(), b.get_result())
When a step produces multiple items as the result of run() it is often useful to name them and return them as a dictionary. Dictionary results are merged (with later inputs overriding earlier ones?) and passed to run
as keyword arguments. So if inputs a
and b
had dictionary results with keys a_0, a_1
and b_0, b_1
, respectively, then run
will be called as
run(a_0=a.get_result()['a_0'], a_1=a.get_result()['a_1'],
b_0=a.get_result()['b_0'], b_1=b.get_result()['b_1'])
This mapping of input results to run arguments can be customized when constructing steps. For example if the results of a
and b
are objects then specifying
inputs_mapping = ['a', 'b']
will result in the call
run(a=a.get_result(), b=b.get_result()
If a
and b
return dicts then the mapping can be used to change their keywords or exclude the values:
inputs_mapping = [{'a_0':'alpha_0', 'a_1': None}, {'b_1':'beta_1'}]
will result in the call
run(alpha_0=a.get_result()['a_0'],
b_0=a.get_result()['b_0'], beta_1=b.get_result()['beta_1'])
where:
a_0
andb_1
have been renamed toalpha_0
andalpha_1
, respectivelya_1
has been excluded, andb_0
has been preserved.
To ignore the inputs mapping simply define
def run(self, *args, **kwargs):
results = [i.get_result() for i in self.inputs]
TODO
Given a collection of steps, drain executes them by generating a temporary Drakefile for them and then calling drake
.
option to store in db instead of files