ENH: Allow the creation of delayed tasks.
Closed this issue · 0 comments
Started in discussion #450.
Is your feature request related to a problem?
Users want to create tasks that are delayed meaning their dependencies are not known at the time pytask starts up.
For example, imagine a task that splits up a text file by lines, stores every line as a separate text file, and then a task that manipulates every line and stores it as a markdown file. We do not know how many lines are in the file, so we cannot allocate all tasks upfront.
Describe the solution you'd like
First, we create a task that splits the file by lines and writes each line in a subfolder, lines
, next to the task module.
_DIR = Path(__file__).parent
@task
def split_file(
path_to_text: Path = Path("text.txt"),
path_to_delayed_paths: Annotated[Path, DelayedPathNode(root=_DIR / "lines", pattern="*.txt"), Product],
) -> None:
text = path_to_text.read_text()
for i, line in enumerate(text.splitlines()):
path_to_delayed_paths.joinpath(f"{i}.txt").write_text(line)
The output paths are specified via DelayedNode
s and here a DelayedPathNode
. It allows to specify a base path available in the task and a pattern (Unix-style glob, regex probably too) for matching.
DelayedNode
s are resolved after the task is executed and the found nodes are added to the workflow.
To create delayed tasks that are resolved at a later point when the dependencies are available, we use a "metatask" that allows us to define more tasks.
@task(is_ready=is_ready_func)
def copy_line(
path_to_delayed_paths: Annotated[Path, DelayedPathNode(root=_DIR / "lines", pattern="*.txt")]
):
for file_ in path_to_delayed_path.glob("*.txt"): # unnecessary, could be easier.
@task
def copy_file(from_: Path = file_, to: Path = file_.with_suffix(".md")):
shutil.copyfile(from_, to)
yield copy_file
What do we need?
(independent) means it is not strictly related to the feature and can be tackled before.
- (independent) Delayed tasks blur the lines between the three-stage process of collection, dag building, and execution in pytask. What happens, for example, if a delayed task produces errors while being collected? I think pytask should aggressively try to execute everything and report all errors at the end of the build when nothing can be executed anymore.
- An interface for delayed nodes resolved after a task is finished.
- What enters a task from a delayed node? Just the base path? base path + pattern?
- Implement delayed tasks
- Implement
is_ready
for@task
. - (independent) An easy interface to let one task depend on another for easily defining the
is_ready
condition. - Implement collection of delayed tasks. Delayed tasks need to go through the normal collection process, added to the dag, and be executed. Errors need to be handled and added to reports.
- Implement
API breaking implications
Describe alternatives you've considered
If we were not to yield multiple tasks from a task, we need a different mechanism to repeat a task over an unknown number of dependencies. This brings us right back to parametrizations á la
@task(is_ready=lambda *x: json_definition_path.exists())
@pytask.mark.parametrize("output", _yield_paths(BLD / "filelist.json"))
def create_file(output: Annotated[Path, Product]):
output.write_text("This is a file.")
Quick comparison of alternatives
Meta Tasks
+ Dependencies and products can be defined as usual.
+ Looping can still be used to define multiple tasks.
- Meta tasks are new.
Parametrized Tasks
- Requires something similar to parametrizations.