This Convex component pools actions and mutations to restrict parallel requests.
- Configure multiple pools with different parallelism.
- Retry failed actions (with backoff and jitter) for idempotent actions, fully configurable (respecting parallelism).
- An
onCompletecallback so you can build durable, reliable workflows. Called when the work is finished, whether it succeeded, failed, or was canceled.
Suppose you have some important async work, like sending verification emails,
and some less important async work, like scraping data from an API. If all of
these are scheduled with ctx.scheduler.runAfter, they'll compete with each
other for resources. The emails might be delayed if there are too many scraping
requests queued ahead of them.
To resolve this problem, you can separate work into different pools.
const emailPool = new Workpool(components.emailWorkpool, {
maxParallelism: 10,
});
const scrapePool = new Workpool(components.scrapeWorkpool, {
maxParallelism: 5,
});
export const userSignUp = mutation({
args: {...},
handler: async (ctx, args) => {
const userId = await ctx.db.insert("users", args);
await emailPool.enqueueAction(ctx, internal.auth.sendEmailVerification, {
userId,
});
},
});
export const downloadLatestWeather = mutation({
handler: async (ctx, args) => {
for (const city of allCities) {
await scrapePool.enqueueAction(ctx, internal.weather.scrape, { city });
}
},
});Imagine that the payment processor is a 3rd party API, and they temporarily have an outage. Now imagine you implement your own action retrying logic for your busy app. You'll find very quickly that your entire backend is overwhelmed with retrying actions. This could bog down live traffic with background work, and/or cause you to exceed rate limits with the payment provider.
Creating an upper bound on how much work will be done in parallel is a good way to mitigate this risk. Actions that are currently backing off awaiting retry will not tie up a thread in the workpool.
By handing off asynchronous work, it will be guaranteed to run, and with retries you can account for temporary failures, while avoiding a "stampeding herd" during third party outages.
With the onComplete callback, you can define how to proceed after each step,
whether that enqueues another job to the workpool, updates the database, etc.
It will always be called, whether the work was successful, failed, or was
canceled. See below for more info.
Example:
const pool = new Workpool(components.emailWorkpool, {
retryActionsByDefault: true,
defaultRetryBehavior: { maxAttempts: 3, initialBackoffMs: 1000, base: 2 },
});
//...
await pool.enqueueAction(ctx, internal.email.send, args, {
onComplete: internal.email.emailSent,
context: { emailType, userId },
retry: false, // don't retry this action, as we can't guarantee idempotency.
});
export const emailSent = internalMutation({
args: {
workId: workIdValidator,
result: resultValidator,
context: v.object({ emailType: v.string(), userId: v.id("users") }),
},
handler: async (ctx, args) => {
if (args.result.kind === "canceled") return;
await ctx.db.insert("userEmailLog", {
userId: args.context.userId,
emailType: args.context.emailType,
result: args.result.kind === "success" ? args.result.returnValue : null,
error: args.result.kind === "failed" ? args.result.error : null,
});
if (args.result.kind === "failed") {
await pool.enqueueAction(ctx, internal.email.checkResendStatus, args, {
retry: { maxAttempts: 10, initialBackoffMs: 250, base: 2 }, // custom
onComplete: internal.email.handleEmailStatus,
context: args.context,
});
}
},
});Idempotent actions are actions that can be run multiple times safely. This typically means they don't cause any side effects that would be a problem if executed twice or more.
As an example of an unsafe, non-idempotent action, consider an action that charges a user's credit card without providing a unique transaction id to the payment processor. The first time the action is run, imagine that the API call succeeds to the payment provider, but then the action throws an exception before the transaction is marked finished in our Convex database. If the action is run twice, the user may be double charged for the transaction!
If we alter this action to provide a consistent transaction id to the payment provider, they can simply NOOP the second payment attempt. The this makes the action idempotent, and it can safely be retried.
If you're creating complex workflows with many steps involving 3rd party APIs:
- You should ensure that each step is an idempotent Convex action.
- You should use this component to manage these actions so it all just works!
With limited parallelism, you can reduce OCC errors from mutations that read and write the same data.
Consider this action that calls a mutation to increment a singleton counter.
By calling the mutation on a workpool with maxParallelism: 1, it will never
throw an error due to conflicts with parallel mutations.
const counterPool = new Workpool(components.counterWorkpool, {
maxParallelism: 1,
});
export const doSomethingAndCount = action({
handler: async (ctx) => {
const doSomething = await fetch("https://example.com");
await counterPool.enqueueMutation(ctx, internal.counter.increment, {});
},
});
// This mutation is prone to conflicting with itself, because it always reads
// and writes the same data. By running it in a workpool with low parallelism,
// it will run serially.
export const increment = internalMutation({
handler: async (ctx) => {
const countDoc = await ctx.db.query("counter").unique();
await ctx.db.patch(countDoc!._id, { count: countDoc!.count + 1 });
},
});Effectively, Workpool runs async functions similar to
ctx.scheduler.runAfter(0, ...), but it limits the number of functions that
can run in parallel.
You'll need an existing Convex project to use the component. Convex is a hosted backend platform, including a database, serverless functions, and a ton more you can learn about here.
Run npm create convex or follow any of the quickstarts to set one up.
See example/ for a working demo.
- Install the Workpool component:
npm install @convex-dev/workpool- Create a
convex.config.tsfile in your app'sconvex/folder and install the component by callinguse:
// convex/convex.config.ts
import { defineApp } from "convex/server";
import workpool from "@convex-dev/workpool/convex.config";
const app = defineApp();
app.use(workpool, { name: "emailWorkpool" });
app.use(workpool, { name: "scrapeWorkpool" });
export default app;import { components } from "./_generated/api";
import { Workpool } from "@convex-dev/workpool";
const pool = new Workpool(components.emailWorkpool, { maxParallelism: 10 });Then you have the following interface on pool:
// Schedule functions to run in the background.
const id = await pool.enqueueMutation(ctx, internal.foo.bar, args);
// Or for an action:
const id = await pool.enqueueAction(ctx, internal.foo.baz, args);
// Is it done yet? Did it succeed or fail?
const status = await pool.status(id);
// You can cancel the work, if it hasn't finished yet.
await pool.cancel(id);See more example usage in example.ts.
Check out the docstrings, but notable options include:
maxParallelism: How many actions/mutations can run at once within this pool.retryActionsByDefault: Whether to retry actions that fail by default.defaultRetryBehavior: The default retry behavior for enqueued actions.
You can override the retry behavior per-call with the retry option.
See the docstrings for more details, but notable options include:
retry: Whether to retry the action if it fails. Overrides defaults. If it's set totrue, it will use thedefaultRetryBehavior. If it's set to a custom config, it will use that (and do retries).onComplete: A mutation to run after the function finishes.context: Any data you want to pass to theonCompletemutation.runAtandrunAfter: Similar toctx.scheduler.run*, allows you to schedule the work to run later. By default it's immediate.
The retry options work like this:
- The first request runs as it's scheduled.
- If it fails, it will wait around
initialBackoffMsand then try again. - Each subsequent retry waits
initialBackoffMs * base^<retryNumber - 1>. - The standard base is 2.
- The actual wait time uses "jitter" to avoid all retries happening at once, if they all failed at the same time.
You can override the retry behavior per-call with the retry option.
The benefit of Workpool is that it won't fall over if there are many jobs scheduled at once, and it allows you to throttle low-priority jobs.
However, Workpool has some overhead and can slow down your workload compared
to using ctx.scheduler directly.
Since each Workpool has some overhead -- each runs several functions to coordinate its work -- don't create too many of them.
If you're running into issues with too many concurrent functions, there are alternatives to Workpool:
- Try combining multiple mutations into a single mutation, with batching or debouncing.
- Call plain TypeScript functions if possible.
- In particular, an action calling
ctx.runActionhas more overhead than just calling the action's handler directly.
- In particular, an action calling
See best practices for more.
The workpool stores the status of each function in the database, so you can
read it even after the function has finished.
By default, it will keep the status for 1 day but you can change this with
the statusTtl option to Workpool.
To keep the status forever, set statusTtl: Number.POSITIVE_INFINITY.
You can read the status of a function by calling pool.status(id).
The status will be one of:
{ kind: "pending"; previousAttempts: number }: The function has not started yet.{ kind: "running"; previousAttempts: number }: The function is currently running.{ kind: "finished" }: The function has succeeded, failed, or been canceled.
You can cancel work by calling pool.cancel(id) or all of them with
pool.cancelAll().
This will avoid starting or retrying, but will not stop in-progress work.