Sewerpipe lets you define workflows based entirely on running Python modules as tasks:
from sewer.config import Task
from sewer.workflows import workflow
t1 = Task(
name="Example 1",
module="sewer.dummy",
parameters_and_flags=dict(
verbose=True,
name="My momma"
)
)
t2 = Task(
name="Example 2",
module="sewer.dummy",
parameters_and_flags=dict(
verbose=False,
name="My momma not"
)
)
@workflow
def workflow():
(t1 >> t2).run()
def main():
workflow()
if __name__ == "__main__":
main()
The syntax is similar to Airflow DAGs, quite intentionally. There are three ways to use it:
- Direct triggering of workflows via
sppe run
- Conversion of the defined workflows to VSCode
launch.json
, so that your debug configuration is up to date with what is defined as a single-source-of-truth workflow (sppe convert --to vscode
) - Library use to enable seamless creation of Airflow DAGs (via the
airflow.create_airflow_tasks
function or direct import under@task
or@task.external_python
decorators)
Note
Using >>
is purely syntactic sugar here. Also, you're free to run arbitrary functions under the workflow
definition, but it kind of defeats the purpose of the project.
Warning
The Task
defnition currently only supports running properly installed Python modules (I'm using python -m
underneath). I am of the strong opinion that proper packaging practices will alleviate most of your pains working with Python, so I am not planning support for running arbitrary scripts (i.e. python something.py
). Also the equivalent of a BashOperator
in Airflow is not implemented and I am not sure whether it would be a good idea in the first place. If you have any use-cases I'm open to discussion.
For one of my projects I needed to generate synthetic data and I wanted to have the option to run the script directly on the target node using Remote SSH extension in VSCode and the Python Debugger, as well as being able to seamlessly run the exact same workflow either from the command line (e.g. in a Tmux session) or in Airflow.
In a tmux
session or directly in the Bash terminal you can run the following:
sppe run -p workflows/example.py
Provided example.py
exists under the ./workflows
directory, you should be able to run any sequence of tasks.
You can run the following:
sppe convert -p workflows/example.py --to vscode
Note
By default the configuration will be written to .vscode/launch.json
. If you need a different output path, use --output
/-o
option and provide custom path.
I have for now not implemented Airflow DAG generation, just creation of individual tasks. I might consider that in the future, I also welcome pull requests, provided the changes are sufficiently tested.
For my own usage, calling airflow.create_airflow_task
is sufficient, since the focus here is on debuggability of individual tasks, not of the entire DAG. And there might be slight differences in local env vs. Airflow's env.
First, tasks can be imported directly and this can be done under an external Python interpreter:
@task.external_python(python="/home/chris/anaconda3/envs/someenv/bin/python")
def data_generation(gpu_index: int):
import sys
from pathlib import Path
from somefolder.workflows.data_generation import t1
t1.run(path_to_python=Path(sys.executable))
You can also of course wrap an entire Sewerpipe workflow under one Airflow task:
@task.external_python(python="/home/chris/anaconda3/envs/someenv/bin/python")
def data_generation(gpu_index: int):
import sys
from pathlib import Path
from somefolder.workflows.data_generation import workflow
workflow()
Note
This is not recommended because you lose separation between different tasks in the Airflow graph. But if you want to run the exact same definition of a workflow, you can do it this way.
You can use some syntactic sugar that I've added here to make working with Python-based tasks more seamless:
from pathlib import Path
from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago
from sewerpipe.utils import load_module
from sewerpipe.task import get_task_from_module
from sewerpipe.airflow import create_airflow_task
def get_tasks(gpu_index: int):
module = load_module(Path("/home/chris/somefolder/workflows/data_generation.py"))
t1 = get_task_from_module(module, "data_generation")
t2 = get_task_from_module(module, "data_noising")
python_interpreter = Path("/home/chris/anaconda3/envs/someenv/bin/python")
workdir = Path("/home/chris/somefolder")
env = {"CUDA_VISIBLE_DEVICES": str(gpu_index)}
callargs = dict(
path_to_python=python_interpreter, env=env, workdir=workdir
)
_t1 = create_airflow_task(t1, **callargs)
_t2 = create_airflow_task(t2, **callargs)
return _t1, _t2
@task
def clean_up():
# Your clean-up implementation
print("Cleaning up...")
with DAG(
'data_prep',
description='Data preparation pipeline',
schedule_interval=None,
) as dag:
data_generation, data_noising = get_tasks(0)
data_generation() >> data_noising() >> clean_up()
Equivalently:
from pathlib import Path
from airflow import DAG
from airflow.decorators import task
from airflow.utils.dates import days_ago
python_interpreter = Path("/home/chris/anaconda3/envs/someenv/bin/python")
workdir = Path("/home/chris/somefolder")
@task.external_python(python=python_interpreter)
def data_generation(gpu_index, workdir): # cannot use type-hints here!
import sys
from pathlib import Path
from uarr.workflows.data_generation import t1
env = {"CUDA_VISIBLE_DEVICES": str(gpu_index)}
t1.run(path_to_python=sys.executable, workdir=workdir, env=env)
@task.external_python(python=python_interpreter)
def data_noising(gpu_index, workdir):
import sys
from pathlib import Path
from uarr.workflows.data_generation import t2
env = {"CUDA_VISIBLE_DEVICES": str(gpu_index)}
t2.run(path_to_python=sys.executable, workdir=workdir, env=env)
@task
def clean_up():
# Your clean-up implementation
print("Cleaning up...")
with DAG(
'data_prep_secondary',
description='Data preparation pipeline',
schedule_interval=None,
) as dag:
gpu_available = 0
data_generated = data_generation(gpu_available, workdir)
data_noised = data_noising(gpu_available, workdir)
gpu_available >> data_generated >> data_noised >> clean_up()
Which style of specifying Airflow Workflows you prefer is up to you. I find the prior less verbose but the latter is arguably more readable.
Note
The former solution does not use @task.external_python
but still provides a secondary path to a different interpreter. If you despise this decorator like I do (because you literally need to do imports within the decorated tasks, as it's being sent to a different instance of the interpreter), you will probably end up using the former pattern more.