Sidekiq extension providing a workflow API on top of Sidekiq Pro's batches. To use this gem, you need a Sidekiq Pro license, and provide the credentials to the gems.contribsys.com
repository via bundler:
bundle config gems.contribsys.com username:password
or alternatively export BUNDLE_GEMS__CONTRIBSYS__COM=username:password
While Sidekiq Pro's batches are powerful, only a rather low level API is provided to work with them. Take this example:
https://github.com/mperham/sidekiq/wiki/Really-Complex-Workflows-with-Batches
This is a lot of complex code scattered in various callbacks to enable a straightforward workflow. It is easy making mistakes when writing such code, and it's also hard to debug. This gem provides an API to define a workflow in a single place, abstracting the Batch API away.
Since version 1.0, retries are supported. Instead of the on_complete
callback, success and death events are now used. The documentation has been updated to reflect the new interface.
require 'sidekiq_workflows'
A workflow consists of Sidekiq workers which can execute in parallel. On successful completion (all workers within a group have completed successfully), a follow-up group of workers can be launched. If a worker within a group dies, the follow-up group will not be started. Retries are supported, please make sure that the Sidekiq workers being used have retries enabled and dead
set to true (sidekiq_options retry: 0
if you don't want any retries).
class A; include Sidekiq::Worker; def perform(x); end; end
class B; include Sidekiq::Worker; def perform(x, y); end; end
class C; include Sidekiq::Worker; def perform(x, y, z); end; end
class D; include Sidekiq::Worker; def perform(x); end; end
class E; include Sidekiq::Worker; def perform(x); end; end
class F; include Sidekiq::Worker; def perform; end; end
workflow = SidekiqWorkflows.build do
perform(A, 'first param to perform')
perform(B, 'first', 'second').then do
perform(C, 'first', 'second', 'third')
perform([
{worker: D, payload: ['first']},
{worker: E, payload: ['first']}
]).then do
perform(F)
end
end
end
A
and B
run in parallel. As soon as B
completes successfully, C
, D
and E
will be launched, running in parallel as well. At last, when D
and E
both complete successfully, F
will be launched.
SidekiqWorkflows.build
can take some additional parameters:
workflow_uuid
: To identify this workflow instance, you may want to provide an ID.except
: An array of worker classes to be entirely skipped in this workflow instance.on_partial_complete
: A callback that is being called whenever a group of workers within the workflow has completed (either by success or death). Modifying the example above:
class WorkflowCallbacks; def on_partial_complete(status, options); end; end
workflow = SidekiqWorkflows.build(on_partial_complete: 'WorkflowCallbacks#on_partial_complete') do
...
end
This is especially useful if you want to report progress of the workflow to a client (for example, send a notification). When using the example above, the callback will be called 5 times in total (for A
, B
, C
, [D, E]
, F
). The status
hash contains the workflow_uuid
if present. For more details on status
and options
, see: https://github.com/mperham/sidekiq/wiki/Batches#callbacks
There's a an additional parameter to perform
as well:
delay
: Wait an amount of time (to_i
must yield seconds as an integer) before launching this worker. This uses Sidekiq'sperform_in
under the hood.
Modifying the example above:
perform(C, 'first', 'second', 'third', delay: 5.minutes)
After the successful completion of B
, C
will be scheduled for execution at 5 minutes in the future.
Generally, perform
can be used in two ways:
perform(C, 'first', 'second', 'third', delay: 5.minutes)
perform([worker: C, payload: ['first', 'second', 'third'], delay: 5.minutes])
The second form can be used to perform multiple workers within a single batch.
Once defined, you can launch a workflow like this:
batch_id = SidekiqWorkflows::Worker.perform_workflow(workflow)
This method returns a Sidekiq Pro batch ID. This batch represents the workflow.
SidekiqWorkflows::Worker.perform_workflow
can take some additional parameters:
on_success
: A callback that is being called once, when the workflow has successfully completed.on_success_options
: A hash of key/value options which will be part of theoptions
hash of the callback.on_death
: A callback that is being called once, when the workflow has 'died' (that is, retries have been exhausted for at least one worker in the hierarchy).on_death_options
: A hash of key/value options which will be part of theoptions
hash of the callback.
class WorkflowCallbacks; def on_success(status, options); end; end
SidekiqWorkflows::Worker.perform_workflow(workflow, on_success: 'WorkflowCallbacks#on_success', on_success_options: {stuff: 1})
If workflow_uuid
has been passed into SidekiqWorkflows.build
, it will also be present inside the options
hash.
There is some additional configuration options.
SidekiqWorkflows.worker_queue = 'some_queue'
SidekiqWorkflows.callback_queue = 'another_queue'
worker_queue
is the name of the Sidekiq queue which will be used for the gem's own meta worker. This worker usually has a execution time of only a few milliseconds, so you may want to use an appropriate queue for that.
callback_queue
is the name of the Sidekiq queue which will be used for the on_partial_complete
, on_success
and on_death
callback workers.
If not specified, the default
Sidekiq queue will be used.
CONTRIBSYS_CREDENTIALS=your-secret-sidekiq-pro-creds docker-compose build
docker-compose run --rm sidekiq-workflows-test bundle exec rake test