Bogdanp/dramatiq

Feature request: pipeline with dynamic children

yoav-orca opened this issue · 0 comments

I'm trying to migrate away from Celery, but I have the following scenario Workflow:

  • Build a list of tasks
  • process tasks (fanout)
  • Join the results

In celery, I can do the following:

@shared_task
def build_task_list(num: int) -> list[str]:
    return [f"Task-{i}" for i in range(num)]

@shared_task
def process_and_gather(task_list: list[str], gather_task: Singature) -> None
    return (group(process.s(t) for t in task_list) | gather_task).delay()

@shared_task
def process(task: str) -> str:
    return f"{task}: done"

@shared_task
def combine_results(results: list[str]) -> str:
    return '\n'.join(results)

# full workflow

workflow = build_task_list.s(100) | process_and_gather.s(gather_task=combine_results.s())
result = workflow.delay()
print(result.get())

It's not possible to do right now in dramatic, I believe it would be a great addition to the library.