Tukio is an event-driven workflow engine library built around asyncio. It provides classes to describe, run and follow-up workflows.
The library is built around a few core concepts: an engine that runs workflows made of tasks that receive events dispatched by a broker in topics.
A task in tukio is basically an asyncio task. But we subclassed asyncio.Task
to add attributes and wrap methods to make it easier to manager tasks within a
workflow.
As a consequence you need to use tukio's own task factory. Note that the engine
does the job for you.
A workflow is basically a collection of tasks executed sequentially starting from the root task.
It triggers workflows upon receiving new data and can control the execution of hundreds of concurrent workflows.
Tukio implements a simple broker that can disptach events in topics. A topic is similar to a communication channel. You need to register a handler to process events received in a given topic. All data received by the engine is wrapped into an event that gathers the data received and, optionnally, the topic it was dispatched to and the source of the event (if the event comes from another workflow/task).
Tukio provides classes to define tasks and workflows. Such objects are called task/workflow templates whereas the execution of workflows/tasks is done in instances. An instance of workflow uses the description of the workflow - the workflow template - to run the right tasks. A task may use a configuration - from the task template - to run properly.
The description of a workflow is pretty straighforward:
{
"title": "workflow #1",
"policy": "abort-running",
"topics": ["abort"],
"tasks": [
{"id": "f1", "name": "task1"},
{"id": "f2", "name": "task2"},
{"id": "f3", "name": "task3"},
{"id": "f4", "name": "task1"},
{"id": "f5", "name": "task2"},
{"id": "f6", "name": "task1", "config": {"value": 1}}
],
"graph": {
"f1": ["f2"],
"f2": ["f3", "f4"],
"f3": ["f5"],
"f4": ["f6"],
"f5": [],
"f6": []
}
}
The fields title
, policy
and topics
are optional. The graph
field is a
simple adjacency list that ties tasks together into a DAG (Directed Acyclic
Graph). You must ensure there's only a single root task, otherwise the engine
will raise an exception.
First you need to code and register your own tasks
from tukio.task import register
@register('my-task1')
async def task1(event):
print('hello from task1: {}'.format(event.data))
return 'data from task1'
@register('my-task2')
async def task2(event):
print('hello from task2: {}'.format(event.data))
return 'data from task2'
Let's assume your 1st workflow is the following:
import tukio
wf1 = {
"title": "workflow #1",
"tasks": [
{"id": "f1", "name": "my-task1"},
{"id": "f2", "name": "my-task2"},
{"id": "f3", "name": "my-task2"},
{"id": "f4", "name": "my-task1"},
{"id": "f5", "name": "my-task2"},
{"id": "f6", "name": "my-task1"}
],
"graph": {
"f1": ["f2"],
"f2": ["f3", "f4"],
"f3": ["f5"],
"f4": ["f6"],
"f5": [],
"f6": []
}
}
wf_tmpl = tukio.WorkflowTemplate.from_dict(wf1)
Now, load it into the workflow engine:
import asyncio
loop = asyncio.get_event_loop()
engine = tukio.Engine(loop=loop)
loop.run_until_complete(engine.load(wf_tmpl))
It's now time to run your 1st workflow:
wflows = loop.run_until_complete(engine.data_received('apple'))
# Wait for the end of the workflows triggered
loop.run_until_complete(asyncio.wait(wflows))
You've just run your 1st workflow with tukio and should get an output like this:
hello from task1: apple
hello from task2: data from task1
hello from task1: data from task2
hello from task2: data from task2
hello from task1: data from task1
hello from task2: data from task2
We always welcome great ideas. If you want to hack on the library, a guide is dedicated to it and describes the various steps involved.