insitro/redun

Nesting tasks with different executors: confusing results

Closed this issue · 3 comments

I want to run most of my tasks on a local executor, then pass inputs to a large batch function. I've tried expressing this in what I think is the obvious way (following https://github.com/insitro/redun/blob/main/examples/05_aws_batch/workflow.py#L41):

import os
from redun import task, Dir, script

redun_namespace = "test"

@task(executor='batch', memory=1, cpus=1)
def bioformats2raw():
    outfile = f"1907.zarr"
    cmd = f"mkdir {outfile}; touch {outfile}/a {outfile}/b"
    return script(cmd,
                  outputs=[Dir(f"s3://projectid-davidek-zarr-conversion/{outfile}").stage(outfile)])

@task(executor="default")
def main():
    return bioformats2raw()

and my redun.ini defines two executor environments:

# redun configuration.

[backend]
db_uri = sqlite:///redun.db

[executors.default]
type = local
max_workers = 20

[executors.batch]
type = aws_batch
image = awsaccountid.dkr.ecr.us-west-2.amazonaws.com/bioformats2raw
queue = davidek-test-queue-2
s3_scratch = s3://projectid-davidek-redun-test/redun/
role = arn:aws:iam::awsaccountid:role/batch-ecs-job-role
job_name_prefix = redun-example

When I run this, it does say each of the tasks will run in the expected locations:

[redun] Cached Job c3871573:  test.main() (eval_hash=3fd24539)
[redun] Run    Job 8bb5452f:  test.bioformats2raw() on batch
[redun] Executor[batch]: submit redun job 8bb5452f-3338-4b9a-85ca-5288f177f7a8 as AWS Batch job 77b7faf1-413a-4d54-8dc5-fa89f524d924:
[redun]   job_id          = 77b7faf1-413a-4d54-8dc5-fa89f524d924
[redun]   job_name        = redun-example-1ba283a7dc057903b17a2a1997df629a34c5839c
[redun]   s3_scratch_path = s3://projectid-davidek-redun-test/redun/jobs/1ba283a7dc057903b17a2a1997df629a34c5839c
[redun]   retry_attempts  = 0
[redun]   debug           = False

however, while the batch job runs, it looks like script steps actually ran local:

[redun] Run    Job 7514841a:  redun.script(command='(\n# Save command to temp file.\nCOMMAND_FILE="$(mktemp)"\ncat > "$COMMAND_FILE" <<"EOF"\n#!/usr/bin/env bash\nset -exo pipefail\nmkdir 1907.zarr; touch 1907.zarr/a 1907.zarr/b\nEOF\n\n# Execute t..., inputs=[], outputs=[StagingDir(local=Dir(path=1907.zarr, hash=907a6eabb8205d543fb669976617797b5a78f289), remote=Dir(path=s3://projectid-davidek-zarr-conversion/1907.zarr, hash=3de668659261e26cd39c31e6b9452f701b2784c3))], task_options={}, temp_path=None) on default
[redun] Run    Job 29f40cf1:  redun.script_task(command='(\n# Save command to temp file.\nCOMMAND_FILE="$(mktemp)"\ncat > "$COMMAND_FILE" <<"EOF"\n#!/usr/bin/env bash\nset -exo pipefail\nmkdir 1907.zarr; touch 1907.zarr/a 1907.zarr/b\nEOF\n\n# Execute t...) on default
[redun] Run    Job 5392c430:  redun.postprocess_script(result=b'upload: 1907.zarr/b to s3://projectid-davidek-zarr-conversion/1907.zarr/b\nupload: 1907.zarr/a to s3://projectid-davidek-zarr-conversion/1907.zarr/a\n', outputs=[StagingDir(local=Dir(path=1907.zarr, hash=907a6eabb8205d543fb669976617797b5a78f289), remote=Dir(path=s3://projectid-davidek-zarr-conversion/1907.zarr, hash=3de668659261e26cd39c31e6b9452f701b2784c3))], temp_path=None) on default

I verified this as well by seeing the artifacts of the batch task in my local redun dir. I ensured I'm not running with batch debug=True or --pdb.

I was able to force the batch task to run on batch by making the default executor be batch:

[executors.default]
type = awsbatch

so it looks to me like a "batch" task that calls script actually runs script() in a default executor, possibly because script invokes _script, which is an @task that doesn't specify an executor:
https://github.com/insitro/redun/blob/main/redun/scripting.py#L158

Either way, how do I get the functionality of script() to run in an AWS batch task?

Good question. As you can see in the job run log messages script() is itself a task. It's extra kwargs are task options, so you pass the executor and AWSBatchExecutor options (memory, cpus) directly to script(). bioformats2raw() can locally since its just preparing the script command.

import os
from redun import task, Dir, script

redun_namespace = "test"

@task()
def bioformats2raw():
    outfile = f"1907.zarr"
    cmd = f"mkdir {outfile}; touch {outfile}/a {outfile}/b"
    return script(
        cmd,
        executor='batch', 
        memory=1, 
        cpus=1
        outputs=[Dir(f"s3://projectid-davidek-zarr-conversion/{outfile}").stage(outfile)]
    )

@task()
def main():
    return bioformats2raw()

Everything else looks good from what I can tell.

Thanks.

I suggest explicitly documenting this in an example, possibly updating 05_aws_batch where the batch job invokes task_on_default to instead invoke task_on_local, and show how it's required to pass in the executor parameters.

The issue I see, btw, with having these be top-level kwargs is that eventually there will be kwarg name collisions. I suspect that executor params like memory and cpus should be part of a dictionary, instead of top-level kwargs.

I suggest explicitly documenting this in an example, possibly updating 05_aws_batch where the batch job invokes task_on_default to instead invoke task_on_local, and show how it's required to pass in the executor parameters.

That's a great idea.

The issue I see, btw, with having these be top-level kwargs is that eventually there will be kwarg name collisions. I suspect that executor params like memory and cpus should be part of a dictionary, instead of top-level kwargs.

Interesting idea. I could see an executor_options dict could help explicitly route kwargs to the Executor. That's a good idea to keep in mind if we run into collisions.