Nesting tasks with different executors: confusing results
Closed this issue · 3 comments
I want to run most of my tasks on a local executor, then pass inputs to a large batch function. I've tried expressing this in what I think is the obvious way (following https://github.com/insitro/redun/blob/main/examples/05_aws_batch/workflow.py#L41):
import os
from redun import task, Dir, script
redun_namespace = "test"
@task(executor='batch', memory=1, cpus=1)
def bioformats2raw():
outfile = f"1907.zarr"
cmd = f"mkdir {outfile}; touch {outfile}/a {outfile}/b"
return script(cmd,
outputs=[Dir(f"s3://projectid-davidek-zarr-conversion/{outfile}").stage(outfile)])
@task(executor="default")
def main():
return bioformats2raw()
and my redun.ini defines two executor environments:
# redun configuration.
[backend]
db_uri = sqlite:///redun.db
[executors.default]
type = local
max_workers = 20
[executors.batch]
type = aws_batch
image = awsaccountid.dkr.ecr.us-west-2.amazonaws.com/bioformats2raw
queue = davidek-test-queue-2
s3_scratch = s3://projectid-davidek-redun-test/redun/
role = arn:aws:iam::awsaccountid:role/batch-ecs-job-role
job_name_prefix = redun-example
When I run this, it does say each of the tasks will run in the expected locations:
[redun] Cached Job c3871573: test.main() (eval_hash=3fd24539)
[redun] Run Job 8bb5452f: test.bioformats2raw() on batch
[redun] Executor[batch]: submit redun job 8bb5452f-3338-4b9a-85ca-5288f177f7a8 as AWS Batch job 77b7faf1-413a-4d54-8dc5-fa89f524d924:
[redun] job_id = 77b7faf1-413a-4d54-8dc5-fa89f524d924
[redun] job_name = redun-example-1ba283a7dc057903b17a2a1997df629a34c5839c
[redun] s3_scratch_path = s3://projectid-davidek-redun-test/redun/jobs/1ba283a7dc057903b17a2a1997df629a34c5839c
[redun] retry_attempts = 0
[redun] debug = False
however, while the batch job runs, it looks like script steps actually ran local:
[redun] Run Job 7514841a: redun.script(command='(\n# Save command to temp file.\nCOMMAND_FILE="$(mktemp)"\ncat > "$COMMAND_FILE" <<"EOF"\n#!/usr/bin/env bash\nset -exo pipefail\nmkdir 1907.zarr; touch 1907.zarr/a 1907.zarr/b\nEOF\n\n# Execute t..., inputs=[], outputs=[StagingDir(local=Dir(path=1907.zarr, hash=907a6eabb8205d543fb669976617797b5a78f289), remote=Dir(path=s3://projectid-davidek-zarr-conversion/1907.zarr, hash=3de668659261e26cd39c31e6b9452f701b2784c3))], task_options={}, temp_path=None) on default
[redun] Run Job 29f40cf1: redun.script_task(command='(\n# Save command to temp file.\nCOMMAND_FILE="$(mktemp)"\ncat > "$COMMAND_FILE" <<"EOF"\n#!/usr/bin/env bash\nset -exo pipefail\nmkdir 1907.zarr; touch 1907.zarr/a 1907.zarr/b\nEOF\n\n# Execute t...) on default
[redun] Run Job 5392c430: redun.postprocess_script(result=b'upload: 1907.zarr/b to s3://projectid-davidek-zarr-conversion/1907.zarr/b\nupload: 1907.zarr/a to s3://projectid-davidek-zarr-conversion/1907.zarr/a\n', outputs=[StagingDir(local=Dir(path=1907.zarr, hash=907a6eabb8205d543fb669976617797b5a78f289), remote=Dir(path=s3://projectid-davidek-zarr-conversion/1907.zarr, hash=3de668659261e26cd39c31e6b9452f701b2784c3))], temp_path=None) on default
I verified this as well by seeing the artifacts of the batch task in my local redun dir. I ensured I'm not running with batch debug=True or --pdb.
I was able to force the batch task to run on batch by making the default executor be batch:
[executors.default]
type = awsbatch
so it looks to me like a "batch" task that calls script actually runs script() in a default executor, possibly because script invokes _script, which is an @task that doesn't specify an executor:
https://github.com/insitro/redun/blob/main/redun/scripting.py#L158
Either way, how do I get the functionality of script() to run in an AWS batch task?
Good question. As you can see in the job run log messages script()
is itself a task. It's extra kwargs are task options, so you pass the executor and AWSBatchExecutor options (memory, cpus) directly to script()
. bioformats2raw()
can locally since its just preparing the script command.
import os
from redun import task, Dir, script
redun_namespace = "test"
@task()
def bioformats2raw():
outfile = f"1907.zarr"
cmd = f"mkdir {outfile}; touch {outfile}/a {outfile}/b"
return script(
cmd,
executor='batch',
memory=1,
cpus=1
outputs=[Dir(f"s3://projectid-davidek-zarr-conversion/{outfile}").stage(outfile)]
)
@task()
def main():
return bioformats2raw()
Everything else looks good from what I can tell.
Thanks.
I suggest explicitly documenting this in an example, possibly updating 05_aws_batch where the batch job invokes task_on_default to instead invoke task_on_local, and show how it's required to pass in the executor parameters.
The issue I see, btw, with having these be top-level kwargs is that eventually there will be kwarg name collisions. I suspect that executor params like memory and cpus should be part of a dictionary, instead of top-level kwargs.
I suggest explicitly documenting this in an example, possibly updating 05_aws_batch where the batch job invokes task_on_default to instead invoke task_on_local, and show how it's required to pass in the executor parameters.
That's a great idea.
The issue I see, btw, with having these be top-level kwargs is that eventually there will be kwarg name collisions. I suspect that executor params like memory and cpus should be part of a dictionary, instead of top-level kwargs.
Interesting idea. I could see an executor_options
dict could help explicitly route kwargs to the Executor. That's a good idea to keep in mind if we run into collisions.