tgen/jetstream

Reset predecessor/parent task functionality

Closed this issue ยท 12 comments

While writing phoenix we are running into two use cases where tasks are failing upon the restart of a project.

  • New data has been generated for a library which requires alignment to be restarted. This causes alignment to immediately fail because the fastqs from the previous run do not exists in the temp folder which is due to the copy_fastq tasks from the previous run have already completed and are not reset.
  • Some tasks use files that were generated by a parent task in the temp folder. If a change was made to the task that requires the file in the temp folder it will not exist if the project was restarted and retrieved from archive.

I think we could take advantage of the "after", "output", and "input" directives to control which parent tasks to reset. The "after" directive could indicate a task that needs to have it's direct parent tasks reset due to files not being in archive space while the using the "input" and "output" directives would indicate that the files are in archive space.

Let me make sure I understand you correctly, the simplified workflow looks something like this to begin with:

Task(copy_fastqs_A) -> temp/A.fq -> Task(align_A) -> Task(cleanup) -> deletes temp/A.fq
Task(copy_fastqs_B) -> temp/B.fq -> Task(align_A) -> Task(cleanup) -> deletes temp/B.fq

Then new data is generated which requires more tasks to be added:

Task(copy_fastqs_A) -> temp/A.fq -> Task(align_A) -> Task(cleanup) -> deletes temp/A.fq
Task(copy_fastqs_B) -> temp/B.fq -> Task(align_A) -> Task(cleanup) -> deletes temp/B.fq
Task(copy_fastqs_C) -> temp/C.fq -> Task(align_A) -> Task(cleanup) -> deletes temp/C.fq

Or is it new fastqs for one of the existing samples:

Task(copy_fastqs_A) -> temp/A.fq -> Task(align_A) -> Task(cleanup) -> deletes temp/A.fq
Task(copy_fastqs_B) -> temp/B_new_.fq -> Task(align_A) -> Task(cleanup) -> deletes temp/B_new_.fq

Yes, your first and second block represents the first use case with a few additional steps in between copy and align.

More specifically it is this scenario

Task(copy_fastqs_A) -> temp/A.fq -> Task(align_A) -> temp/A.bam -> merge A& B -> archive/ab.bam
Task(copy_fastqs_B) -> temp/B.fq -> Task(align_B) -> temp/B.bam

When restarted and archive is recovered all temp/ files are gone though new C.fq is available that will merge with A&B as they are from one sample the intermediate bam and or fastq are missing but yet jetstream thinks it was already done and completed successfully so it does not redo. Actually this becomes the same issue as the second case by austin as there are multiple /temp files missing the lane level fastq, the split fastq, the split bam

Task(copy_fastqs_A) -> temp/A.fq -> Task(align_A) -> temp/A.bam -> merge A,B,C -> archive/abc.bam
Task(copy_fastqs_B) -> temp/B.fq -> Task(align_B) -> temp/B.bam
Task(copy_fastqs_C) -> temp/C.fq -> Task(align_C) -> temp/C.bam

Great, I think I understand the problem. Here is a template that I think represents a test case for this scenario. We can use it for suggesting solutions:

example.jst:

{% for s in samples %}

- name: create_temp_{{ s }}
  output: temp/{{ s }}.txt
  cmd: |
    mkdir -p temp
    echo "hello sample {{ s }}" > "temp/{{ s }}.txt"

{% endfor %}

- name: process_samples
  input: 
  {% for s in samples %}
  - temp/{{ s }}.txt
  {% endfor %}
  cmd: | 
    set -ue

    {% for s in samples %}
    cat "temp/{{ s }}.txt" >> final.txt
    rm "temp/{{ s }}.txt"

    {% endfor %}

Running this once or multiple times should work fine:

jetstream run example.jst -c json:samples '["A", "B"]'

Running after adding to samples should fail:

jetstream run example.jst -c json:samples '["A", "B", "C"]'
Click here for the logs my test runs

First time works

$ jetstream init example_project
[๐ŸŒต jetstream] 2019-11-07 14:17:08 INFO: Version: jetstream 1.5
[๐ŸŒต jetstream.cli] 2019-11-07 14:17:08 INFO: Initializing project
[๐ŸŒต jetstream.cli] 2019-11-07 14:17:08 INFO: <Project path=/home/rrichholt/example_project>
$ cd example_project/
$ jetstream run ~/example.jst -c json:samples '["A", "B"]'
[๐ŸŒต jetstream] 2019-11-07 14:17:22 INFO: Version: jetstream 1.5
[๐ŸŒต jetstream.templates] 2019-11-07 14:17:22 INFO: Rendering template...
[๐ŸŒต jetstream.templates] 2019-11-07 14:17:22 INFO: Template rendering data sources include:
    0: Project index: {'__project__': {'args': '/home/rrichholt/venv/bin/jetstream init [...]
    1: Command args: {'samples': ['A', 'B']}
[๐ŸŒต jetstream.templates] 2019-11-07 14:17:22 INFO: Loading tasks...
[๐ŸŒต jetstream.workflows] 2019-11-07 14:17:22 INFO: Mashing G:/home/rrichholt/example_project/jetstream/workflow.pickle:0 tasks with H:None:3 tasks
[๐ŸŒต jetstream.workflows] 2019-11-07 14:17:22 INFO: Building workflow graph...
[๐ŸŒต jetstream.workflows] 2019-11-07 14:17:22 INFO: Mash report:
New tasks: 3
Modified tasks: 0
Total reset: 1
[๐ŸŒต jetstream.workflows] 2019-11-07 14:17:22 INFO: Retry: Resetting state for any pending or failed tasks...
[๐ŸŒต jetstream.workflows] 2019-11-07 14:17:22 INFO: Building workflow graph...
slurm 19.05.3-2
[๐ŸŒต jetstream.backends] 2019-11-07 14:17:22 INFO: SlurmBackend initialized
[๐ŸŒต jetstream.runner] 2019-11-07 14:17:22 INFO: Saving workflow: /home/rrichholt/example_project/jetstream/workflow.pickle
[๐ŸŒต jetstream.backends] 2019-11-07 14:17:22 INFO: Slurm job monitor started!
[๐ŸŒต jetstream.backends] 2019-11-07 14:17:22 INFO: SlurmBackend submitted(2347): create_temp_A
[๐ŸŒต jetstream.backends] 2019-11-07 14:17:22 INFO: SlurmBackend submitted(2348): create_temp_B
[๐ŸŒต jetstream.backends] 2019-11-07 14:17:53 INFO: Complete: create_temp_A
[๐ŸŒต jetstream.backends] 2019-11-07 14:17:53 INFO: Complete: create_temp_B
[๐ŸŒต jetstream.backends] 2019-11-07 14:17:53 INFO: SlurmBackend submitted(2349): process_samples
[๐ŸŒต jetstream.backends] 2019-11-07 14:18:24 INFO: Complete: process_samples
[๐ŸŒต jetstream.runner] 2019-11-07 14:18:24 INFO: Run complete!
[๐ŸŒต jetstream.backends] 2019-11-07 14:18:24 INFO: Slurm job monitor stopped!
[๐ŸŒต jetstream.runner] 2019-11-07 14:18:24 INFO: Saving workflow: /home/rrichholt/example_project/jetstream/workflow.pickle
[๐ŸŒต jetstream.runner] 2019-11-07 14:18:24 INFO: Total run time: 0:01:01.977533

Second time fails:

$ jetstream run ~/example.jst -c json:samples '["A", "B", "C"]'
[๐ŸŒต jetstream] 2019-11-07 14:18:54 INFO: Version: jetstream 1.5
[๐ŸŒต jetstream.templates] 2019-11-07 14:18:54 INFO: Rendering template...
[๐ŸŒต jetstream.templates] 2019-11-07 14:18:54 INFO: Template rendering data sources include:
    0: Project index: {'__project__': {'args': '/home/rrichholt/venv/bin/jetstream init [...]
    1: Command args: {'samples': ['A', 'B', 'C']}
[๐ŸŒต jetstream.templates] 2019-11-07 14:18:54 INFO: Loading tasks...
[๐ŸŒต jetstream.workflows] 2019-11-07 14:18:54 INFO: Mashing G:/home/rrichholt/example_project/jetstream/workflow.pickle:3 tasks with H:None:4 tasks
[๐ŸŒต jetstream.workflows] 2019-11-07 14:18:54 INFO: Building workflow graph...
[๐ŸŒต jetstream.workflows] 2019-11-07 14:18:54 INFO: Mash report:
New tasks: 1
Modified tasks: 1
Total reset: 1
[๐ŸŒต jetstream.workflows] 2019-11-07 14:18:54 INFO: Retry: Resetting state for any pending or failed tasks...
[๐ŸŒต jetstream.workflows] 2019-11-07 14:18:54 INFO: Building workflow graph...
slurm 19.05.3-2
[๐ŸŒต jetstream.backends] 2019-11-07 14:18:54 INFO: SlurmBackend initialized
[๐ŸŒต jetstream.runner] 2019-11-07 14:18:54 INFO: Saving workflow: /home/rrichholt/example_project/jetstream/workflow.pickle
[๐ŸŒต jetstream.backends] 2019-11-07 14:18:54 INFO: Slurm job monitor started!
[๐ŸŒต jetstream.backends] 2019-11-07 14:18:55 INFO: SlurmBackend submitted(2353): create_temp_C
[๐ŸŒต jetstream.backends] 2019-11-07 14:19:25 INFO: Complete: create_temp_C
[๐ŸŒต jetstream.backends] 2019-11-07 14:19:25 INFO: SlurmBackend submitted(2354): process_samples
[๐ŸŒต jetstream.backends] 2019-11-07 14:19:56 INFO: Failed: process_samples
[๐ŸŒต jetstream.runner] 2019-11-07 14:19:56 INFO: Run complete!
[๐ŸŒต jetstream.backends] 2019-11-07 14:19:56 INFO: Slurm job monitor stopped!
[๐ŸŒต jetstream.runner] 2019-11-07 14:19:56 INFO: Saving workflow: /home/rrichholt/example_project/jetstream/workflow.pickle
[๐ŸŒต jetstream.runner] 2019-11-07 14:19:56 INFO: Total run time: 0:01:01.838180
[๐ŸŒต jetstream.cli] 2019-11-07 14:19:56 INFO: 1 total tasks failed:
[๐ŸŒต jetstream.cli] 2019-11-07 14:19:56 INFO: <Task(failed): process_samples> Slurm(2354)
[๐ŸŒต jetstream.cli] 2019-11-07 14:19:56 ERROR: There were errors during the run.

For now you will have to work around this, either manually resetting the upstream tasks after copying the projects back to scratch, or just re-running everything.

For long term solutions, my first idea is to add a directive that would link tasks together during the reset process. Here's how it might look:

{% for s in samples %}

- name: create_temp_{{ s }}
  output: temp/{{ s }}.txt
  cmd: |
    mkdir -p temp
    echo "hello sample {{ s }}" > "temp/{{ s }}.txt"

{% endfor %}

- name: process_samples
  reset_affects:
  {% for s in samples %}
    - create_temp_{{ s }}
  {% endfor %}
  input: 
  {% for s in samples %}
  - temp/{{ s }}.txt
  {% endfor %}
  cmd: | 
    set -ue

    {% for s in samples %}
    cat "temp/{{ s }}.txt" >> final.txt
    rm "temp/{{ s }}.txt"

    {% endfor %}

When workflows are reset, any tasks the have the reset_affects directive will also force the listed tasks (and their descendants) to be reset.

I've been wondering if the "reset_affects" type solution might be need. This is because we have in part created this problem by wanting to only copy the results not the intermediates to final archive space. In doing so we have separated the temp files and final files which causes this issue and its hard for jetstream to realize this issues exists. So my thoughts were either we need a directive that tells the jetstream that this task is archive or temp dependent. The issue might be how many steps up the graph you would need to go. This is where my though was if you could tag each task with a directive of "requires_temp: TRUE/FALSE" if the reset task is "requires_temp: TRUE" then you move up the graph and reset that upstream task(s), then test again if those task have "requires_temp: TRUE", if so you keep moving up and reseting completed tasks till you encounter "requires_temp: FALSE", which indicates that task works off /archive space data.

Again this is a self created problem but likely not an issue isolated to our situation, from the goal of no wasted compute up also a need to separate compute and storage locations, plus a need to ensure duplicate/temp files are only around when needed by a compute task.

If we added this new directive at a later date would the tasks be reset because they now have a new directive? If this is the case then any project would have to start from scratch after updating phoenix tasks with the new directive. If the logic could use the existing directives (after, input, output) as mentioned previously we could adjust phoenix today without having to update the templates at a later time causing any restarted project to rerun due to new directives.

@PedalheadPHX Thats a good idea, maybe the directive could be: reset and options could be parents or specific task names

@awchrist The task identity is only computed from the exec and cmd directives, so adding a new directive won't affect older projects

Implemented in v1.6 #93

@ryanrichholt Quick question. we have a task that exports the "SBATCH_ACCOUNT" variable so that all of the subsequent sbatch jobs use the correct account. if a project is restarted I'm guessing that this task will not reset the way we have it now. Will the before-re directive reset everything downstream? Do you have a suggestion for restarting this task no matter what without resetting anything else?

@ryanrichholt You can ignore my last comment. I moved the export of the "SBATCH_ACCOUNT" variable to jetstream_centro which I think is the more appropriate place for it.

@awchrist Thanks for letting me know, it sparked an idea:

I added three new global functions to the template renderer. These can be used to interact with environment variables during the render process. The result is that you can export variables on render, and there's no concern with needing to reset tasks later on.

{% do setenv('FOO', foo_value) %}

- name: taskA
  cmd: env | grep FOO

Example run

$ jetstream run --backend local example.jst -c foo_value bar
[๐ŸŒต jetstream] 2019-11-22 17:13:55 INFO: Version: 1.6b3
[๐ŸŒต jetstream.templates] 2019-11-22 17:13:55 INFO: Rendering template...
[๐ŸŒต jetstream.templates] 2019-11-22 17:13:55 INFO: Template rendering data sources include:
    0: Command: {'foo_value': 'bar'}
[๐ŸŒต jetstream.templates] 2019-11-22 17:13:55 INFO: Loading tasks...
[๐ŸŒต jetstream.workflows] 2019-11-22 17:13:55 INFO: Building workflow graph for <jetstream.workflows.Workflow object at 0x2aaac3d05c88>...
[๐ŸŒต jetstream.workflows] 2019-11-22 17:13:55 INFO: Retry: Resetting state for any pending or failed tasks...
[๐ŸŒต jetstream.backends] 2019-11-22 17:13:55 INFO: LocalBackend initialized with 2 cpus
[๐ŸŒต jetstream.runner] 2019-11-22 17:13:55 WARNING: Autosave is enabled, but no path has been set for this workflow. Progress will not be saved.
[๐ŸŒต jetstream.backends] 2019-11-22 17:13:55 INFO: LocalBackend spawned(5887): taskA
FOO=bar
[๐ŸŒต jetstream.backends] 2019-11-22 17:13:55 INFO: Complete: taskA
[๐ŸŒต jetstream.runner] 2019-11-22 17:13:55 INFO: Run complete!
[๐ŸŒต jetstream.runner] 2019-11-22 17:13:55 INFO: Total run time: 0:00:00.016628