Feature request: pipeline with dynamic children
yoav-orca opened this issue · 0 comments
yoav-orca commented
I'm trying to migrate away from Celery, but I have the following scenario Workflow:
- Build a list of tasks
- process tasks (fanout)
- Join the results
In celery, I can do the following:
@shared_task
def build_task_list(num: int) -> list[str]:
return [f"Task-{i}" for i in range(num)]
@shared_task
def process_and_gather(task_list: list[str], gather_task: Singature) -> None
return (group(process.s(t) for t in task_list) | gather_task).delay()
@shared_task
def process(task: str) -> str:
return f"{task}: done"
@shared_task
def combine_results(results: list[str]) -> str:
return '\n'.join(results)
# full workflow
workflow = build_task_list.s(100) | process_and_gather.s(gather_task=combine_results.s())
result = workflow.delay()
print(result.get())
It's not possible to do right now in dramatic, I believe it would be a great addition to the library.