Running Workflow Inside of Workflow
neovintage opened this issue · 17 comments
Is it possible to run a workflow inside of a workflow?
I have a couple of Jobs that run in sequential order but then I have a step where I can fan out to multiple workers. The problem is I don't know the number of workers I need ahead of time in that parallel step and I need to bring them all back together in one step before continuing in the workflow. I've gone through the code and I feel like I'm missing something like a callback to say that a workflow is finished.
Hi, @neovintage!
Right now Gush doesn't have that functionality of nesting workflows. But it's our number one priority on the list of features, as that would be useful also for us internally.
Hackish way at the moment would be making a job that starts a different workflow and polls its status until it finishes (or fails). And then ends execution to allow dependents to continue.
class NestedWorkflow < Gush::Job
def work
nested_workflow = SomeWorkflow.create
nested_workflow.start!
loop do
case nested_workflow.reload.status
when :failed
raise SomeErrorToMarkTheJobAsFailed
when :finished
# end the loop to mark the job as finished
break
end
# sleep for a while before checking again
sleep(5)
end
end
end
I had a feeling that was the case. I wish I had more time to build that functionality in myself. I can't wait to see this enhancement. :-)
Thanks for getting back to me so quickly.
I have a somewhat related question...
Is it possible for a job to enqueue n number of jobs in a workflow?
Hypothetical example:
class Wf < Gush::Workflow
def configure
run FindNumbersJob
run ReduceNumbersJob, after: FindNumbersJob
end
end
class FindNumbersJob < Gush::Job
def work
Numbers.all.each do |num|
# I'd like to enqueue ProcessNumberJob in the workflow
end
end
end
class ProcessNumberJob < Gush::Job
def work
# do some really hard work
end
end
@hunzinker you can do that, but not from inside of the Job, but in configure
method of the workflow, see: #24 (comment)
Thanks for the quick reply! I think I'm missing something else. After trying the below, ReduceNumbersJob
is run after the first FindNumberJob
finishes which is not desired. Is it possible for ReduceNumbersJob
to run after all FindNumberJob
s finish?
class Wf < Gush::Workflow
def configure
[numbers].each do |n|
run ProcessNumberJob, params: { position: n }
end
run ReduceNumbersJob, after: ProcessNumberJob
end
Each run
call returns a job id
you can use for later scheduling, you need a map to get all the ids and the use that array in after:
attribute:
class Wf < Gush::Workflow
def configure
process_jobs = [numbers].map do |n|
run ProcessNumberJob, params: { position: n }
end
run ReduceNumbersJob, after: process_jobs
end
Ha! I just tried that and it worked. Thanks!
Awesome :)
Closing since I am currently working on that functionality 🕐
In the example above, is it possible to use a value from payloads
for the numbers
of jobs ?
The number of jobs which will be scheduled is defined at creation time of the workflow. while payloads exist at run time. So you can't schedule jobs when they are already running.
Thanks for the quick answer!
No problem :)
I am building a workflow for data import. I have DownloadFileJob, ProcessEeachRecordJob and GenerateReportJob. Since I do not know how many records there will be in each import I am using this pattern:
class ImportWorkflow < Gush::Workflow
def configure
row_jobs = CSV.foreach("path/to/data.csv").map do |row|
run ProcessEeachRecordJob, params: row
end
run GenerateReportJob, after: row_jobs
end
end
To schedule the workflow I can create another job run it via something like sidekiq-cron every X hours.
class WorkflowManagerJob < ApplicationJob
def perform
flow = ImportWorkflow.create
flow.start!
end
end
However, that will leave gush Redis keys after execution. I could do this to cleanup and run it via same WorkflowManagerJob.
client = Gush::Client.new
client.all_workflows.each do |flow|
client.destroy_workflow flow unless flow.running?
end
But it feels a little hacky. Ideally I would remove records of flow after it finishes. Or set Redis TTL to delete keys in the near future. Has anyone done something like that? Thank you very much.
@dmitrypol this is a good idea. I haven't considered TTL before but overall clean up is something that I think should not be automated - some users might want to leave the workflows, while others might want to remove it right after finishing.
This could potentially be an option in Gush configuration.
@pokonski Nesting workflow enhancement done?
Hi @pokonski . Are there any changes about supporting nested workflows? That would be super helpful for our project also.