This library is no longer in active development!
A lightweight library for event and workflow-based programming in Python. Uses native python datatypes for expressing the workflow concisely. Modules inspired by DOM Events. Yet another workflow library.
Workflow tasks can be implemented as regular python functions, or class objects that implement the __call__()
method, or as sub-classes of the liteflow.Module
class.
A task function will be sent the output of the previous task. A task function has to accept atleast one argument. This is the only requirement.
from liteflow import compile_workflow
example_workflow = [
task_1, { # run `task_1()`, then branch conditionally based on the return value of task_1
"event_x": [task_x1, task_x2], # run `task_x1()` and `task_x2()` one-after-another (i.e sequentially)
"event_y": [task_y1, { # or, run `task_y1()`, then run `task_y2a()` and `task_y2b()` in parallel
task_y2a,
task_y2b
}],
},
]
example_workflow = compile_workflow(example_workflow)
# run the workflow
example_workflow.dispatch_event("event_foo", "hello")
A visual representation of this workflow is:
Please see the example below for a sample implementation of the tasks in this example.
The library focuses on concise expression of workflows, using native Python datatypes. The API is designed to be lightweight and familiar, so that it's easier to remember and be productive.
The workflow logic is represented by combining just three native datatypes:
- list -
[a, b, c]
- Tasksa
,b
,c
are run sequentially i.e. one-after-another. The output ofa
is fed intob
, and the output ofb
is fed intoc
. - set -
{a, b, c}
- Tasksa
,b
,c
are run in parallel. All three receive the same input (i.e. the data returned from the previous task). - dict -
{"x": a, "y": b}
- Taska
is run if the parent emits"x"
, taskb
is run if the parent emits"y"
. The dictionary key can also be a function (which receives the previous task's output, and needs to returnTrue
orFalse
).
Workflows can be nested. The outputs from a set
or dict
workflow will be batched and emitted one by one to the attached task. The output from the last task in a list
workflow will be emitted to the next task.
Workflow tasks can be implemented as regular python functions, or class objects that implement the __call__()
method, or as sub-classes of the liteflow.Module
class.
Please see the example and API reference below.
- It's easy to visualize the logic of a complex system, at a single glance.
- Each task can be unit-tested, and can focus on doing just one thing.
- Non-technical users can write/modify the workflow logic, using visual programming. Maybe someone will write a visual programming plugin for liteflow, that produces liteflow-compatible workflow code?
You can also express neural network architectures with liteflow, since PyTorch's nn.Module
objects can be used directly as tasks.
The workflow has been defined in the example above.
Now, let's write an example implementation for each of the workflow tasks. In this example, task_1
emits "event_x"
or "event_y"
at random. This will result in one of the two branches getting executed each time the workflow is run.
A task function has to accept a minimum of one argument. This is mandatory.
There is no upper limit on the number of arguments that can be sent to a task function. Please ensure that the number of arguments in a task function matches the number of values sent by the previous task.
In the example below,
task_1
accepts only one extra argument (other than the mandatory first argument), whiletask_x1
does not accept any extra arguments. So if the"event_foo"
event is being sent totask_1
, exactly one extra argument needs to be sent, and if"event_x"
is being sent totask_x1
, no extra arguments should be sent.If a task function does not return anything, the next function will receive
None
as the first argument.
import random
def task_1(a, b):
print("task 1. Got:", a, b)
return random.choice(("event_x", "event_y"))
def task_x1(a):
print("task x1. Got:", a)
return "event_x1", 42, "question"
def task_x2(a, b, c):
print("task x2. Got:", a, b, c) # prints `task x2. Got: "event_x1" 42 "question"`
def task_y1(a):
print("task y1. Got:", a)
return "event_y1"
def task_y2a(a):
print("task y2a. Got:", a)
def task_y2b(a, *args):
print("task y2b. Got:", a, args)
compile_workflow(workflow)
returns a liteflow.Module
instance, to which events can be sent.
This module serves as the starting point of the workflow.
Extending from liteflow.Module
adds 6 methods. Three DOM-like event-handling methods, and three workflow-related methods (to send/receive data between modules in the workflow).
-
add_event_listener(event_name: str, listener: function)
-
dispatch_event(event_name: str, *args)
-
remove_event_listener(event_name: str, listener: function)
-
attach_output_listener(other_module: Module)
-
emit_event(event_name: str, *args)
-
detach_output_listener(other_module: Module)