/sidekiq_workflows

Sidekiq extension providing a workflow API on top of Sidekiq Pro's batches

Primary LanguageRuby

sidekiq_workflows Github Actions Gem Version

Sidekiq extension providing a workflow API on top of Sidekiq Pro's batches. To use this gem, you need a Sidekiq Pro license, and provide the credentials to the gems.contribsys.com repository via bundler:

bundle config gems.contribsys.com username:password

or alternatively export BUNDLE_GEMS__CONTRIBSYS__COM=username:password

Rationale

While Sidekiq Pro's batches are powerful, only a rather low level API is provided to work with them. Take this example:

https://github.com/mperham/sidekiq/wiki/Really-Complex-Workflows-with-Batches

This is a lot of complex code scattered in various callbacks to enable a straightforward workflow. It is easy making mistakes when writing such code, and it's also hard to debug. This gem provides an API to define a workflow in a single place, abstracting the Batch API away.

Version 1 (breaking changes)

Since version 1.0, retries are supported. Instead of the on_complete callback, success and death events are now used. The documentation has been updated to reflect the new interface.

Usage

require 'sidekiq_workflows'

Defining a workflow

A workflow consists of Sidekiq workers which can execute in parallel. On successful completion (all workers within a group have completed successfully), a follow-up group of workers can be launched. If a worker within a group dies, the follow-up group will not be started. Retries are supported, please make sure that the Sidekiq workers being used have retries enabled and dead set to true (sidekiq_options retry: 0 if you don't want any retries).

class A; include Sidekiq::Worker; def perform(x); end; end
class B; include Sidekiq::Worker; def perform(x, y); end; end
class C; include Sidekiq::Worker; def perform(x, y, z); end; end
class D; include Sidekiq::Worker; def perform(x); end; end
class E; include Sidekiq::Worker; def perform(x); end; end
class F; include Sidekiq::Worker; def perform; end; end

workflow = SidekiqWorkflows.build do
  perform(A, 'first param to perform')
  perform(B, 'first', 'second').then do
    perform(C, 'first', 'second', 'third')
    perform([
      {worker: D, payload: ['first']},
      {worker: E, payload: ['first']}
    ]).then do
      perform(F)
    end
  end
end

A and B run in parallel. As soon as B completes successfully, C, D and E will be launched, running in parallel as well. At last, when D and E both complete successfully, F will be launched.

Workflow example diagram

Additional parameters

SidekiqWorkflows.build can take some additional parameters:

  • workflow_uuid: To identify this workflow instance, you may want to provide an ID.
  • except: An array of worker classes to be entirely skipped in this workflow instance.
  • on_partial_complete: A callback that is being called whenever a group of workers within the workflow has completed (either by success or death). Modifying the example above:
class WorkflowCallbacks; def on_partial_complete(status, options); end; end

workflow = SidekiqWorkflows.build(on_partial_complete: 'WorkflowCallbacks#on_partial_complete') do
  ...
end

This is especially useful if you want to report progress of the workflow to a client (for example, send a notification). When using the example above, the callback will be called 5 times in total (for A, B, C, [D, E], F). The status hash contains the workflow_uuid if present. For more details on status and options, see: https://github.com/mperham/sidekiq/wiki/Batches#callbacks

There's a an additional parameter to perform as well:

  • delay: Wait an amount of time (to_i must yield seconds as an integer) before launching this worker. This uses Sidekiq's perform_in under the hood.

Modifying the example above:

perform(C, 'first', 'second', 'third', delay: 5.minutes)

After the successful completion of B, C will be scheduled for execution at 5 minutes in the future.

Generally, perform can be used in two ways:

  • perform(C, 'first', 'second', 'third', delay: 5.minutes)
  • perform([worker: C, payload: ['first', 'second', 'third'], delay: 5.minutes])

The second form can be used to perform multiple workers within a single batch.

Launching a workflow

Once defined, you can launch a workflow like this:

batch_id = SidekiqWorkflows::Worker.perform_workflow(workflow)

This method returns a Sidekiq Pro batch ID. This batch represents the workflow.

Additional parameters

SidekiqWorkflows::Worker.perform_workflow can take some additional parameters:

  • on_success: A callback that is being called once, when the workflow has successfully completed.
  • on_success_options: A hash of key/value options which will be part of the options hash of the callback.
  • on_death: A callback that is being called once, when the workflow has 'died' (that is, retries have been exhausted for at least one worker in the hierarchy).
  • on_death_options: A hash of key/value options which will be part of the options hash of the callback.
class WorkflowCallbacks; def on_success(status, options); end; end

SidekiqWorkflows::Worker.perform_workflow(workflow, on_success: 'WorkflowCallbacks#on_success', on_success_options: {stuff: 1})

If workflow_uuid has been passed into SidekiqWorkflows.build, it will also be present inside the options hash.

Configuration

There is some additional configuration options.

SidekiqWorkflows.worker_queue = 'some_queue'
SidekiqWorkflows.callback_queue = 'another_queue'

worker_queue is the name of the Sidekiq queue which will be used for the gem's own meta worker. This worker usually has a execution time of only a few milliseconds, so you may want to use an appropriate queue for that.

callback_queue is the name of the Sidekiq queue which will be used for the on_partial_complete, on_success and on_death callback workers.

If not specified, the default Sidekiq queue will be used.

Development

Run tests locally

  1. CONTRIBSYS_CREDENTIALS=your-secret-sidekiq-pro-creds docker-compose build
  2. docker-compose run --rm sidekiq-workflows-test bundle exec rake test