Robust job processing in Elixir, backed by modern PostgreSQL.
Reliable,
observable and loaded with enterprise grade features.
- Features
- Requirements
- Oban Web+Pro
- Installation
- Usage
- Testing
- Error Handling
- Instrumentation & Logging
- Isolation
- Troubleshooting
- Community
- Contributing
Note: This README is for the unreleased master branch, please reference the official documentation on hexdocs for the latest stable release.
Oban's primary goals are reliability, consistency and observability.
It is fundamentally different from other background job processing tools because it retains job data for historic metrics and inspection. You can leave your application running indefinitely without worrying about jobs being lost or orphaned due to crashes.
-
Fewer Dependencies — If you are running a web app there is a very good chance that you're running on top of a RDBMS. Running your job queue within PostgreSQL minimizes system dependencies and simplifies data backups.
-
Transactional Control — Enqueue a job along with other database changes, ensuring that everything is committed or rolled back atomically.
-
Database Backups — Jobs are stored inside of your primary database, which means they are backed up together with the data that they relate to.
-
Isolated Queues — Jobs are stored in a single table but are executed in distinct queues. Each queue runs in isolation, ensuring that a job in a single slow queue can't back up other faster queues.
-
Queue Control — Queues can be started, stopped, paused, resumed and scaled independently at runtime locally or across all running nodes (even in environments like Heroku, without distributed Erlang).
-
Resilient Queues — Failing queries won't crash the entire supervision tree, instead they trip a circuit breaker and will be retried again in the future.
-
Job Canceling — Jobs can be canceled in the middle of execution regardless of which node they are running on. This stops the job at once and flags it as
discarded
. -
Triggered Execution — Database triggers ensure that jobs are dispatched as soon as they are inserted into the database.
-
Unique Jobs — Duplicate work can be avoided through unique job controls. Uniqueness can be enforced at the argument, queue and worker level for any period of time.
-
Scheduled Jobs — Jobs can be scheduled at any time in the future, down to the second.
-
Periodic (CRON) Jobs — Automatically enqueue jobs on a cron-like schedule. Duplicate jobs are never enqueued, no matter how many nodes you're running.
-
Job Priority — Prioritize jobs within a queue to run ahead of others.
-
Historic Metrics — After a job is processed the row isn't deleted. Instead, the job is retained in the database to provide metrics. This allows users to inspect historic jobs and to see aggregate data at the job, queue or argument level.
-
Node Metrics — Every queue records metrics to the database during runtime. These are used to monitor queue health across nodes and may be used for analytics.
-
Queue Draining — Queue shutdown is delayed so that slow jobs can finish executing before shutdown. When shutdown starts queues are paused and stop executing new jobs. Any jobs left running after the shutdown grace period may be rescued later.
-
Telemetry Integration — Job life-cycle events are emitted via Telemetry integration. This enables simple logging, error reporting and health checkups without plug-ins.
Oban has been developed and actively tested with Elixir 1.8+, Erlang/OTP 21.1+ and PostgreSQL 11.0+. Running Oban currently requires Elixir 1.8+, Erlang 21+, and PostgreSQL 9.6+.
A web-based user interface for managing Oban, along with an official set of plugins and workers are available as private packages. Learn more about Oban Web+Pro at [getoban.pro][https://getoban.pro].
Oban is published on Hex. Add it to your list of
dependencies in mix.exs
:
def deps do
[
{:oban, "~> 2.0.0-rc.2"}
]
end
Then run mix deps.get
to install Oban and its dependencies, including
Ecto, Jason and Postgrex.
After the packages are installed you must create a database migration to
add the oban_jobs
table to your database:
mix ecto.gen.migration add_oban_jobs_table
Open the generated migration in your editor and call the up
and down
functions on Oban.Migrations
:
defmodule MyApp.Repo.Migrations.AddObanJobsTable do
use Ecto.Migration
def up do
Oban.Migrations.up()
end
# We specify `version: 1` in `down`, ensuring that we'll roll all the way back down if
# necessary, regardless of which version we've migrated `up` to.
def down do
Oban.Migrations.down(version: 1)
end
end
This will run all of Oban's versioned migrations for your database. Migrations between versions are idempotent and will never change after a release. As new versions are released you may need to run additional migrations.
Now, run the migration to create the table:
mix ecto.migrate
Next see Usage for how to integrate Oban into your application and start defining jobs!
If you are using releases you may see Postgrex errors logged during your initial deploy (or any deploy requiring an Oban migration). The errors are only temporary! After the migration has completed each queue will start producing jobs normally.
Oban is a robust job processing library which uses PostgreSQL for storage and coordination.
Each Oban instance is a supervision tree and not an application. That means it won't be started automatically and must be included in your application's supervision tree. All of your configuration is passed into the supervisor, allowing you to configure Oban like the rest of your application:
# config/config.exs
config :my_app, Oban,
repo: MyApp.Repo,
queues: [default: 10, events: 50, media: 20]
# lib/my_app/application.ex
defmodule MyApp.Application do
@moduledoc false
use Application
alias MyApp.Repo
alias MyAppWeb.Endpoint
def start(_type, _args) do
children = [
Repo,
Endpoint,
{Oban, oban_config()}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
defp oban_config do
opts = Application.get_env(:my_app, Oban)
# Prevent running queues or scheduling jobs from an iex console, i.e. when starting app with `iex -S mix`
if Code.ensure_loaded?(IEx) and IEx.started?() do
opts
|> Keyword.put(:crontab, false)
|> Keyword.put(:queues, false)
else
opts
end
end
end
If you are running tests (which you should be) you'll want to disable pruning , enqueuing scheduled jobs and job dispatching altogether when testing:
# config/test.exs
config :my_app, Oban, crontab: false, queues: false, plugins: false
Queues are specified as a keyword list where the key is the name of the queue and the value is the maximum number of concurrent jobs. The following configuration would start four queues with concurrency ranging from 5 to 50:
queues: [default: 10, mailers: 20, events: 50, media: 5]
There isn't a limit to the number of queues or how many jobs may execute concurrently in each queue. Here are a few caveats and guidelines:
-
Each queue will run as many jobs as possible concurrently, up to the configured limit. Make sure your system has enough resources (i.e. database connections) to handle the concurrent load.
-
Queue limits are local (per-node), not global (per-cluster). For example, running a queue with a local limit of one on three separate nodes is effectively a global limit of three. If you require a global limit you must restrict the number of nodes running a particular queue.
-
Only jobs in the configured queues will execute. Jobs in any other queue will stay in the database untouched.
-
Be careful how many concurrent jobs make expensive system calls (i.e. FFMpeg, ImageMagick). The BEAM ensures that the system stays responsive under load, but those guarantees don't apply when using ports or shelling out commands.
Worker modules do the work of processing a job. At a minimum they must define a
perform/2
function, which is called with an args
map and the job struct.
Note that the args
map passed to perform/2
will always have string keys,
regardless of the key type when the job was enqueued. The args
are stored as
jsonb
in PostgreSQL and the serialization process automatically stringifies
all keys.
Define a worker to process jobs in the events
queue:
defmodule MyApp.Business do
use Oban.Worker, queue: :events
@impl Oban.Worker
def perform(%_{args: %{"id" => id}}) do
model = MyApp.Repo.get(MyApp.Business.Man, id)
case args do
%{"in_the" => "business"} ->
IO.inspect(model)
%{"vote_for" => vote} ->
IO.inspect([vote, model])
_ ->
IO.inspect(model)
end
:ok
end
end
The use
macro also accepts options to customize max attempts, priority, tags,
and uniqueness:
defmodule MyApp.LazyBusiness do
use Oban.Worker,
queue: :events,
priority: 3,
max_attempts: 3,
tags: ["business"],
unique: [period: 30]
@impl Oban.Worker
def perform(_job) do
# do business slowly
:ok
end
end
Successful jobs should return :ok
or an {:ok, value}
tuple. The value
returned from perform/1
is used to control whether the job is treated as a
success, a failure, discarded completely or deferred until later.
See the Oban.Worker
docs for more details on failure conditions and
Oban.Telemetry
for details on job reporting.
Jobs are simply Ecto structs and are enqueued by inserting them into the
database. For convenience and consistency all workers provide a new/2
function that converts an args map into a job changeset suitable for insertion:
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()
The worker's defaults may be overridden by passing options:
%{id: 1, vote_for: "none of the above"}
|> MyApp.Business.new(queue: :special, max_attempts: 5)
|> Oban.insert()
Jobs may be scheduled at a specific datetime in the future:
%{id: 1}
|> MyApp.Business.new(scheduled_at: ~U[2020-12-25 19:00:56.0Z])
|> Oban.insert()
Jobs may also be scheduled down to the second any time in the future:
%{id: 1}
|> MyApp.Business.new(schedule_in: 5)
|> Oban.insert()
Unique jobs can be configured in the worker, or when the job is built:
%{email: "brewster@example.com"}
|> MyApp.Mailer.new(unique: [period: 300, fields: [:queue, :worker])
|> Oban.insert()
Job priority can be specified using an integer from 0 to 3, with 0 being the default and highest priority:
%{id: 1}
|> MyApp.Backfiller.new(priority: 2)
|> Oban.insert()
Any number of tags can be added to a job dynamically, at the time it is inserted:
id = 1
%{id: id}
|> MyApp.OnboardMailer.new(tags: ["mailer", "record-#{id}"])
|> Oban.insert()
Multiple jobs can be inserted in a single transaction:
Ecto.Multi.new()
|> Oban.insert(:b_job, MyApp.Business.new(%{id: 1}))
|> Oban.insert(:m_job, MyApp.Mailer.new(%{email: "brewser@example.com"}))
|> Repo.transaction()
Occasionally you may need to insert a job for a worker that exists in another
application. In that case you can use Oban.Job.new/2
to build the changeset
manually:
%{id: 1, user_id: 2}
|> Oban.Job.new(queue: :default, worker: OtherApp.Worker)
|> Oban.insert()
Oban.insert/2,4
is the preferred way of inserting jobs as it provides some of
Oban's advanced features (i.e., unique jobs). However, you can use your
application's Repo.insert/2
function if necessary.
See Oban.Job.new/2
for a full list of job options.
Job stats and queue introspection are built on keeping job rows in the database
after they have completed. This allows administrators to review completed jobs
and build informative aggregates, at the expense of storage and an unbounded
table size. To prevent the oban_jobs
table from growing indefinitely, Oban
provides active pruning of completed
and discarded
jobs.
By default, pruning retains jobs for 60 seconds.
-
Pruning is best-effort and performed out-of-band. This means that all limits are soft; jobs beyond a specified age may not be pruned immediately after jobs complete.
-
Pruning is only applied to jobs that are
completed
ordiscarded
(has reached the maximum number of retries or has been manually killed). It'll never delete a new job, a scheduled job or a job that failed and will be retried.
The unique jobs feature lets you specify constraints to prevent enqueuing
duplicate jobs. Uniqueness is based on a combination of args
, queue
,
worker
, state
and insertion time. It is configured at the worker or job
level using the following options:
-
:period
— The number of seconds until a job is no longer considered duplicate. You should always specify a period.:infinity
can be used to indicate the job be considered a duplicate as long as jobs are retained. -
:fields
— The fields to compare when evaluating uniqueness. The available fields are:args
,:queue
and:worker
, by default all three are used. -
:states
— The job states that are checked for duplicates. The available states are:available
,:scheduled
,:executing
,:retryable
and:completed
. By default all states are checked, which prevents any duplicates, even if the previous job has been completed.
For example, configure a worker to be unique across all fields and states for 60 seconds:
use Oban.Worker, unique: [period: 60]
Configure the worker to be unique only by :worker
and :queue
:
use Oban.Worker, unique: [fields: [:queue, :worker], period: 60]
Or, configure a worker to be unique until it has executed:
use Oban.Worker, unique: [period: 300, states: [:available, :scheduled, :executing]]
Unique jobs are guaranteed through transactional locks and database queries: they do not rely on unique constraints in the database. This makes uniqueness entirely configurable by application code, without the need for database migrations.
If your application makes heavy use of unique jobs you may want to add an index
on the args
column of the oban_jobs
table. The other columns considered for
uniqueness are already covered by indexes.
Oban allows jobs to be registered with a cron-like schedule and enqueued
automatically. Periodic jobs are registered as a list of {cron, worker}
or
{cron, worker, options}
tuples:
config :my_app, Oban, repo: MyApp.Repo, crontab: [
{"* * * * *", MyApp.MinuteWorker},
{"0 * * * *", MyApp.HourlyWorker, args: %{custom: "arg"}},
{"0 0 * * *", MyApp.DailyWorker, max_attempts: 1},
{"0 12 * * MON", MyApp.MondayWorker, queue: :scheduled, tags: ["mondays"]},
{"@daily", MyApp.AnotherDailyWorker}
]
These jobs would be executed as follows:
MyApp.MinuteWorker
— Executed once every minuteMyApp.HourlyWorker
— Executed at the first minute of every hour with custom argsMyApp.DailyWorker
— Executed at midnight every day with no retriesMyApp.MondayWorker
— Executed at noon every Monday in the "scheduled" queue
The crontab format respects all standard rules and has one minute resolution. Jobs are considered unique for most of each minute, which prevents duplicate jobs with multiple nodes and across node restarts.
Standard Cron expressions are composed of rules specifying the minutes, hours, days, months and weekdays. Rules for each field are comprised of literal values, wildcards, step values or ranges:
*
— Wildcard, matches any value (0, 1, 2, ...)0
— Literal, matches only itself (only 0)*/15
— Step, matches any value that is a multiple (0, 15, 30, 45)0-5
— Range, matches any value within the range (0, 1, 2, 3, 4, 5)0-23/2
- Step values can be used in conjunction with ranges (0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22)
Each part may have multiple rules, where rules are separated by a comma. The allowed values for each field are as follows:
minute
— 0-59hour
— 0-23days
— 1-31month
— 1-12 (or aliases,JAN
,FEB
,MAR
, etc.)weekdays
— 0-6 (or aliases,SUN
,MON
,TUE
, etc.)
The following Cron extensions are supported:
@hourly
—0 * * * *
@daily
(as well as@midnight
) —0 0 * * *
@weekly
—0 0 * * 0
@monthly
—0 0 1 * *
@yearly
(as well as@annually
) —0 0 1 1 *
@reboot
— Run once at boot across the entire cluster
Some specific examples that demonstrate the full range of expressions:
0 * * * *
— The first minute of every hour*/15 9-17 * * *
— Every fifteen minutes during standard business hours0 0 * DEC *
— Once a day at midnight during december0 7-9,4-6 13 * FRI
— Once an hour during both rush hours on Friday the 13th
For more in depth information see the man documentation for cron
and crontab
in your system. Alternatively you can experiment with various expressions
online at Crontab Guru.
-
All schedules are evaluated as UTC unless a different timezone is configured. See
Oban.start_link/1
for information about configuring a timezone. -
Workers can be used for regular and scheduled jobs so long as they accept different arguments.
-
Duplicate jobs are prevented through transactional locks and unique constraints. Workers that are used for regular and scheduled jobs must not specify
unique
options less than60s
. -
Long running jobs may execute simultaneously if the scheduling interval is shorter than it takes to execute the job. You can prevent overlap by passing custom
unique
opts in the crontab config:custom_args = %{scheduled: true} unique_opts = [ period: 60 * 60 * 24, states: [:available, :scheduled, :executing] ] config :my_app, Oban, repo: MyApp.Repo, crontab: [ {"* * * * *", MyApp.SlowWorker, args: custom_args, unique: unique_opts}, ]
Normally, all available jobs within a queue are executed in the order they were
scheduled. You can override the normal behavior and prioritize or de-prioritize
a job by assigning a numerical priority
.
-
Priorities from 0-3 are allowed, where 0 is the highest priority and 3 is the lowest.
-
The default priority is 0, unless specified all jobs have an equally high priority.
-
All jobs with a higher priority will execute before any jobs with a lower priority. Within a particular priority jobs are executed in their scheduled order.
Oban provides some helpers to facilitate testing. The helpers handle the
boilerplate of making assertions on which jobs are enqueued. To use the
perform_job/2,3
, assert_enqueued/1
and refute_enqueued/1
helpers in your
tests you must include them in your testing module and specify your app's Ecto
repo:
use Oban.Testing, repo: MyApp.Repo
Now you can assert, refute or list jobs that have been enqueued within your integration tests:
assert_enqueued worker: MyWorker, args: %{id: 1}
# or
refute_enqueued queue: :special, args: %{id: 2}
# or
assert [%{args: %{"id" => 1}}] = all_enqueued worker: MyWorker
You can also easily unit test workers with the perform_job/2,3
function, which
automates validating job args, options, and worker results from a single
function call:
assert :ok = perform_job(MyWorker, %{id: 1})
# or
assert :ok = perform_job(MyWorker, %{id: 1}, attempt: 3, max_attempts: 3)
# or
assert {:error, _} = perform_job(MyWorker, %{bad: :arg})
See the Oban.Testing
module for more details.
As noted in Usage, there are some guidelines for running tests:
-
Disable all job dispatching by setting
queues: false
orqueues: nil
in yourtest.exs
config. Keyword configuration is deep merged, so settingqueues: []
won't have any effect. -
Disable plugins via
plugins: false
. Default plugins, such as the fixed pruner, aren't necessary in testing mode because jobs created within the sandbox are rolled back at the end of the test. Additionally, the periodic pruning queries will raiseDBConnection.OwnershipError
when the application boots. -
Disable cron jobs via
crontab: false
. Periodic jobs aren't useful while testing and scheduling can lead to random ownership issues. -
Be sure to use the Ecto Sandbox for testing. Oban makes use of database pubsub events to dispatch jobs, but pubsub events never fire within a transaction. Since sandbox tests run within a transaction no events will fire and jobs won't be dispatched.
config :my_app, MyApp.Repo, pool: Ecto.Adapters.SQL.Sandbox
During integration testing it may be necessary to run jobs because they do work
essential for the test to complete, i.e. sending an email, processing media,
etc. You can execute all available jobs in a particular queue by calling
Oban.drain_queue/1,2
directly from your tests.
For example, to process all pending jobs in the "mailer" queue while testing some business logic:
defmodule MyApp.BusinessTest do
use MyApp.DataCase, async: true
alias MyApp.{Business, Worker}
test "we stay in the business of doing business" do
:ok = Business.schedule_a_meeting(%{email: "monty@brewster.com"})
assert %{success: 1, failure: 0} == Oban.drain_queue(queue: :mailer)
# Now, make an assertion about the email delivery
end
end
See Oban.drain_queue/1,2
for additional details.
When a job returns an error value, raises an error or exits during execution the
details are recorded within the errors
array on the job. When the number of
execution attempts is below the configured max_attempts
limit, the job will
automatically be retried in the future.
The retry delay has an exponential backoff, meaning the job's second attempt will be after 16s, third after 31s, fourth after 1m 36s, etc.
See the Oban.Worker
documentation on "Customizing Backoff" for alternative
backoff strategies.
Execution errors are stored as a formatted exception along with metadata about when the failure occurred and which attempt caused it. Each error is stored with the following keys:
at
The utc timestamp when the error occurred atattempt
The attempt number when the error occurrederror
A formatted error message and stacktrace
See the Instrumentation docs for an example of integrating with external error reporting systems.
By default, jobs are retried up to 20 times. The number of retries is controlled
by the max_attempts
value, which can be set at the Worker or Job level. For
example, to instruct a worker to discard jobs after three failures:
use Oban.Worker, queue: :limited, max_attempts: 3
By default, individual jobs may execute indefinitely. If this is undesirable you
may define a timeout in milliseconds with the timeout/1
callback on your
worker module.
For example, to limit a worker's execution time to 30 seconds:
def MyApp.Worker do
use Oban.Worker
@impl Oban.Worker
def perform(_args, _job) do
something_that_may_take_a_long_time()
:ok
end
@impl Oban.Worker
def timeout(_job), do: :timer.seconds(30)
end
The timeout/1
function accepts an Oban.Job
struct, so you can customize the
timeout using any job attributes.
Define the timeout
value through job args:
def timeout(%_{args: %{"timeout" => timeout}}), do: timeout
Define the timeout
based on the number of attempts:
def timeout(%_{attempt: attempt}), do: attempt * :timer.seconds(5)
Oban provides integration with Telemetry, a dispatching library for
metrics. It is easy to report Oban metrics to any backend by attaching to
:oban
events.
Here is an example of a sample unstructured log handler:
defmodule MyApp.ObanLogger do
require Logger
def handle_event([:oban, :job, :start], measure, meta, _) do
Logger.warn("[Oban] :started #{meta.worker} at #{measure.system_time}")
end
def handle_event([:oban, :job, event], measure, meta, _) do
Logger.warn("[Oban] #{event} #{meta.worker} ran in #{measure.duration}")
end
end
Attach the handler to success and failure events in application.ex
:
events = [[:oban, :job, :start], [:oban, :job, :stop], [:oban, :job, :exception]]
:telemetry.attach_many("oban-logger", events, &MyApp.ObanLogger.handle_event/4, [])
The Oban.Telemetry
module provides a robust structured logger that handles all
of Oban's telemetry events. As in the example above, attach it within your
application.ex
module:
:ok = Oban.Telemetry.attach_default_logger()
For more details on the default structured logger and information on event
metadata see docs for the Oban.Telemetry
module.
Another great use of execution data is error reporting. Here is an example of integrating with Honeybadger to report job failures:
defmodule ErrorReporter do
def handle_event([:oban, :job, :exception], measure, meta, _) do
context =
meta
|> Map.take([:id, :args, :queue, :worker])
|> Map.merge(measure)
Honeybadger.notify(meta.error, context, meta.stacktrace)
end
def handle_event([:oban, :circuit, :trip], _measure, meta, _) do
context = Map.take(meta, [:name])
Honeybadger.notify(meta.error, context, meta.stacktrace)
end
end
:telemetry.attach_many(
"oban-errors",
[[:oban, :job, :exception], [:oban, :circuit, :trip]],
&ErrorReporter.handle_event/4,
nil
)
Oban supports namespacing through PostgreSQL schemas, also called "prefixes" in Ecto. With prefixes your jobs table can reside outside of your primary schema (usually public) and you can have multiple separate job tables.
To use a prefix you first have to specify it within your migration:
defmodule MyApp.Repo.Migrations.AddPrefixedObanJobsTable do
use Ecto.Migration
def up do
Oban.Migrations.up(prefix: "private")
end
def down do
Oban.Migrations.down(prefix: "private")
end
end
The migration will create the "private" schema and all tables, functions and triggers within that schema. With the database migrated you'll then specify the prefix in your configuration:
config :my_app, Oban,
prefix: "private",
repo: MyApp.Repo,
queues: [default: 10]
Now all jobs are inserted and executed using the private.oban_jobs
table. Note
that Oban.insert/2,4
will write jobs in the private.oban_jobs
table, you'll
need to specify a prefix manually if you insert jobs directly through a repo.
Not only is the oban_jobs
table isolated within the schema, but all
notification events are also isolated. That means that insert/update events will
only dispatch new jobs for their prefix. You can run multiple Oban instances
with different prefixes on the same system and have them entirely isolated,
provided you give each supervisor a distinct id.
Here we configure our application to start three Oban supervisors using the "public", "special" and "private" prefixes, respectively:
def start(_type, _args) do
children = [
Repo,
Endpoint,
Supervisor.child_spec({Oban, name: ObanA, repo: Repo}, id: ObanA),
Supervisor.child_spec({Oban, name: ObanB, repo: Repo, prefix: "special"}, id: ObanB),
Supervisor.child_spec({Oban, name: ObanC, repo: Repo, prefix: "private"}, id: ObanC)
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
There are a few places to connect and communicate with other Oban users.
- Request an invitation and join the #oban channel on Slack
- Ask questions and discuss Oban on the Elixir Forum
- Learn about bug reports and upcoming features in the issue tracker
- Follow @sorentwo on Twitter
To run the Oban test suite you must have PostgreSQL 10+ running locally with a
database named oban_test
. Follow these steps to create the database, create
the database and run all migrations:
mix test.setup
To ensure a commit passes CI you should run mix ci
locally, which executes the
following commands:
- Check formatting (
mix format --check-formatted
) - Lint with Credo (
mix credo --strict
) - Run all tests (
mix test --raise
) - Run Dialyzer (
mix dialyzer
)