Handling complex workflows through tasks
English | 简体中文
When dealing with complex workflows, it's often necessary to break them down into multiple tasks and then combine these tasks together. This library provides a simple way to define and execute these tasks.
Install tasksflow
pip install tasksflow
Create some simple tasks
import tasksflow.pool
import tasksflow.task
import tasksflow.cache
import tasksflow.executer
from pathlib import Path
class Task1(tasksflow.task.Task):
def run(self):
return {"a": 1, "b": 2}
class Task2(tasksflow.task.Task):
def run(self, a: int, b: int):
return {"c": a + b}
tasks = [Task1(), Task2()]
p = tasksflow.pool.Pool(tasks)
result = p.run() # run tasks in pool
print(result) # {"a": 1, "b": 2, "c": 3}
Each task has multiple input parameters and multiple output parameters. Taking Task2 as an example:
class Task2(tasksflow.task.Task):
def run(self, a: int, b: int):
return {"c": a + b}
Task2 takes input parameters a
and b
, and outputs parameter c
.
Each task needs to inherit from tasksflow.task.Task
and override the run
method. The parameters of the run
method are the inputs required for this task, and the return value should be a dict of parameter->value, ensuring that each task returns non-repeating parameters.
There's no need to explicitly define the dependencies between tasks. Once the parameters returned by the preceding tasks are available, the subsequent tasks that depend on these parameters will be automatically invoked with these values.
tasksflow.pool.Pool
is a pool of tasks used for running a series of tasks. The common usage is:
tasks = [Task1(), Task2()]
p = tasksflow.pool.Pool(tasks)
result = p.run() # run tasks in pool
Initialize the task pool using tasksflow.pool.Pool(tasks)
and execute the task list with result = p.run()
, obtaining the results as a dict of all tasks' parameter->value.
For most tasks, as long as the input parameters and the task code remain the same, the final output result will also be the same. By default, tasksflow
caches the task code and inputs/outputs. When running the task again and hitting the cache, it skips the execution process and directly uses the output. This caching feature can effectively improve development efficiency, as developers don't need to rerun previous tasks when developing subsequent tasks.
Some tasks rely on external factors such as the network or time, and even with the same input parameters and task code, they may produce different outputs. In such cases, it may be necessary to disable caching and force task execution. This can be achieved by declaring cache disabling during task initialization.
tasks = [Task1(), Task2(enable_cache=False)]
Here, Task2 is passed enable_cache=False
, indicating that caching should be disabled for this task.
By default, pool
will create a SQLite database at cache.db
and cache task codes and inputs. If you want to customize the storage path:
from pathlib import Path
p = tasksflow.pool.Pool(tasks, cache_provider=tasksflow.cache.SqliteCacheProvider(Path("mycache.db")))
You can also use MemoryCacheProvider
instead of SqliteCacheProvider
, which stores the cache in memory, commonly used for testing.
p = tasksflow.pool.Pool(tasks, cache_provider=tasksflow.cache.MemoryCacheProvider())
Or you can customize CacheProvider
by inheriting tasksflow.cache.CacheProvider
and implementing the get
and set
methods. Then pass your custom CacheProvider
to the Pool
.
By default, pool
uses tasksflow.executer.MultiprocessExecuter
, which creates a separate process for each task. Once a task is completed, it automatically invokes the dependent tasks based on the output of this task.
You can also use tasksflow.executer.SerialExecuter
, which executes tasks sequentially according to the order in tasks
.
p = tasksflow.pool.Pool(tasks, executer=tasksflow.executer.SerialExecuter())
Or you can create a custom executer.
from typing import Any
class MyExecuter(Executer):
def run(tasks: list[tasksflow.task.Task]) -> dict[str, Any]:
pass
p = tasksflow.pool.Pool(tasks, executer=MyExecuter())
tasksflow
uses the loguru
module for logging. You can control whether tasksflow
's logs are printed using the following code. By default, tasksflow
's logs are disabled.
# install loguru by `pip install loguru`
from loguru import logger
logger.enable("tasksflow")
Scrape the titles of the website https://webscraper.io/test-sites. Use tasksflow
to decompose the scraping task into 2 subtasks: web request and web parsing. This way, when developing the web parsing subtask, caching is utilized to avoid requesting the webpage again.
Install dependencies
pip install tasksflow requests lxml
Write the code
import tasksflow.pool
import tasksflow.task
from lxml import etree
import requests
class TaskRequest(tasksflow.task.Task):
def run(self):
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 Safari/537.36"
}
url = "https://webscraper.io/test-sites"
resp = requests.get(url, headers=headers).text
return {"resp": resp}
class TaskParse(tasksflow.task.Task):
def run(self, resp: str):
html = etree.HTML(resp, etree.HTMLParser())
title_elements = html.xpath("/html/body/div[1]/div[3]/div[*]/div[1]/h2/a")
titles = [title.text.strip() for title in title_elements]
return {"titles": titles}
def main():
tasks = [TaskRequest(), TaskParse()]
p = tasksflow.pool.Pool(tasks)
result = p.run()
print(f"titles: {result['titles']}")
# titles: ['E-commerce site', 'E-commerce site with pagination links', 'E-commerce site with AJAX pagination links', 'E-commerce site with "Load more" buttons', 'E-commerce site that loads items while scrolling', 'Table playground']
if __name__ == "__main__":
main()
See dev_zh_CN.md