chaps-io/gush

Running Workflow Inside of Workflow

neovintage opened this issue · 17 comments

Is it possible to run a workflow inside of a workflow?

I have a couple of Jobs that run in sequential order but then I have a step where I can fan out to multiple workers. The problem is I don't know the number of workers I need ahead of time in that parallel step and I need to bring them all back together in one step before continuing in the workflow. I've gone through the code and I feel like I'm missing something like a callback to say that a workflow is finished.

Hi, @neovintage!

Right now Gush doesn't have that functionality of nesting workflows. But it's our number one priority on the list of features, as that would be useful also for us internally.

Hackish way at the moment would be making a job that starts a different workflow and polls its status until it finishes (or fails). And then ends execution to allow dependents to continue.

class NestedWorkflow < Gush::Job
  def work
    nested_workflow = SomeWorkflow.create
    nested_workflow.start!

    loop do
      case nested_workflow.reload.status
      when :failed
        raise SomeErrorToMarkTheJobAsFailed
      when :finished
        # end the loop to mark the job as finished
        break
      end

      # sleep for a while before checking again
      sleep(5)
    end
  end
end

I had a feeling that was the case. I wish I had more time to build that functionality in myself. I can't wait to see this enhancement. :-)

Thanks for getting back to me so quickly.

I have a somewhat related question...

Is it possible for a job to enqueue n number of jobs in a workflow?

Hypothetical example:

class Wf < Gush::Workflow
  def configure
    run FindNumbersJob
    run ReduceNumbersJob, after: FindNumbersJob
  end
end

class FindNumbersJob < Gush::Job
  def work
    Numbers.all.each do |num|
      # I'd like to enqueue ProcessNumberJob in the workflow
    end
  end
end

class ProcessNumberJob < Gush::Job
  def work
    # do some really hard work
  end
end

@hunzinker you can do that, but not from inside of the Job, but in configure method of the workflow, see: #24 (comment)

Thanks for the quick reply! I think I'm missing something else. After trying the below, ReduceNumbersJob is run after the first FindNumberJob finishes which is not desired. Is it possible for ReduceNumbersJob to run after all FindNumberJobs finish?

class Wf < Gush::Workflow
  def configure
    [numbers].each do |n|
      run ProcessNumberJob, params: { position: n }
    end
    run ReduceNumbersJob, after: ProcessNumberJob
  end

Each run call returns a job id you can use for later scheduling, you need a map to get all the ids and the use that array in after: attribute:

class Wf < Gush::Workflow
  def configure
    process_jobs = [numbers].map do |n|
      run ProcessNumberJob, params: { position: n }
    end
    run ReduceNumbersJob, after: process_jobs
  end

Ha! I just tried that and it worked. Thanks!

Awesome :)

Closing since I am currently working on that functionality 🕐

In the example above, is it possible to use a value from payloads for the numbers of jobs ?

The number of jobs which will be scheduled is defined at creation time of the workflow. while payloads exist at run time. So you can't schedule jobs when they are already running.

Thanks for the quick answer!

No problem :)

I am building a workflow for data import. I have DownloadFileJob, ProcessEeachRecordJob and GenerateReportJob. Since I do not know how many records there will be in each import I am using this pattern:

class ImportWorkflow < Gush::Workflow
  def configure
    row_jobs = CSV.foreach("path/to/data.csv").map do |row|
      run ProcessEeachRecordJob, params: row
    end
    run GenerateReportJob, after: row_jobs
  end
end

To schedule the workflow I can create another job run it via something like sidekiq-cron every X hours.

class WorkflowManagerJob < ApplicationJob
  def perform
    flow = ImportWorkflow.create
    flow.start!
  end
end

However, that will leave gush Redis keys after execution. I could do this to cleanup and run it via same WorkflowManagerJob.

client = Gush::Client.new
client.all_workflows.each do |flow|
  client.destroy_workflow flow unless flow.running?
end

But it feels a little hacky. Ideally I would remove records of flow after it finishes. Or set Redis TTL to delete keys in the near future. Has anyone done something like that? Thank you very much.

@dmitrypol this is a good idea. I haven't considered TTL before but overall clean up is something that I think should not be automated - some users might want to leave the workflows, while others might want to remove it right after finishing.

This could potentially be an option in Gush configuration.

@pokonski Nesting workflow enhancement done?

Hi @pokonski . Are there any changes about supporting nested workflows? That would be super helpful for our project also.