/docmq

A document based messaging queue for Mongo, DocumentDB, and others

Primary LanguageTypeScriptMIT LicenseMIT

DocMQ
DocMQ
DB Backed Messaging Queue

Why Choose This

DocMQ is a good choice if a persistance layer for your messaging queue is a deciding factor. Backed by a database, DocMQ makes it easy to schedule, query, and debug jobs using an adapter layer tuned for your database type. This also allows DocMQ to choose the no database option and run in-memory for local environments where it's unnecessary to set up a local mongo / postgres / etc. DocMQ works with anything that holds and queries documents or document-like objects.

DocMQ is also a strong choice when your messaging queue needs to care about time and time zones. When you want to "send a message at 4:00 am", it matters if you mean 4am in Los Angeles or 4am in Phoenix because only one of those locations implements Daylight Savings Time. DocMQ reduces the pain associated with figuring out if one day is 86400, 90000, or 85800 seconds in the future.

Finally, DocMQ is database agnostic. You can run one backend in development where you are less concerned about scale, and run a robust solution in production. A suite of tests makes it easy to ensure your beahvior is consistent across deployments.

Why AVOID This

Simple. Performance. This kind of solution will never be as fast as an in-memory Redis queue or an event bus. If fast FIFO is your goal, you should consider BullMQ, Kue, Bee, Owl, and others. They're all excellent libraries, and I can't recommend them enough if they fit your use case!

DocMQ

  • An adaptable DB layer with ready-made drivers for MongoDB and Postgres
  • An in-memory driver for faster local development
  • Concurrent job execution, job repitition, scheduling, and persistence of results
  • Queue-level hooks for managing side effects of job completions and failures
  • Timezone aware scheduling and recurrence

Comparison by Feature

Feature BullMQ Agenda DocMQ
Backend redis mongo (any)[#-custom-driver-support]
Parent/Child
Priorities
Concurrency
Delayed jobs
Global events
Rate Limiter
Pause/Resume
Sandboxed worker
Repeatable jobs
Atomic ops
Persistence
UI
REST API
Run In-memory
Timezone awareness
New job polling
Queue subscription
Optimized for Jobs / Messages Jobs Jobs

If you're not concerned about timezone awareness or an in-memory driver, BullMQ and Agenda are excellent alternatives. Thank you BullMQ for making the original comparison chart

⚠️ EARLY DEVELOPMENT - This software is currently used in Production as part of Taskless, but is still a newer project. This project follows semver spec for versions < 1.

0.y.x
  ^ ^- fixes, features
   \-- breaking changes

Installation

# npm
npm i docmq

# yarn
yarn add docmq

# pnpm
pnpm add docmq

DocMQ comes with an in-memory driver MemoryDriver, along with several other adapters for various DBs.

📚 Documentation

Creating a Queue

import { Queue, MemoryDriver } from "docmq";

interface SimpleJob {
  success: boolean;
}

const queue = new Queue<SimpleJob>(new MemoryDriver("default"), "docmq");

new Queue() options

new Queue<
  TData,
  TAck = unknown,
  TFail extends Error = Error,
  TContext = DefaultContext,
>(driver: Driver, name: string, options?: QueueOptions)
  • driver a Driver implementation to use such as the MemoryDriver
  • name a string for the queue's name
  • options? additional options
    • retention.jobs? number of seconds to retain jobs with no further work. Default 3600 (1 hour)
    • statInterval? number of seconds between emitting a stat event with queue statistics, defaults to 5

A Note on TypeScript

This library uses TypeScript to provide a better developer experience regarding the objects passed into your queue and the responses your job processor provides back to DocMQ. There are four main types used throughout this documentation, and all are set during the creation of the Queue class.

TData refers specifically to the typing of your job payload. It's the payload you're expecting to pass when calling enqueue(), and it's the payload you're expecting to receive inside of your process() callback.

TAck = unknown refers to the typing of your ack response when calling api.ack() inside of your job processor and is by default an unknown type. Setting TAck also sets the typings for the ack event.

TFail extends Error = Error refers to the typing of your error object created and passed to api.fail() inside of your job processor and defaults to the base Error class. Setting TFail also sets the typings for your fail event.

TContext = Record<string, unknown> refers to the context object available during processing, and is by default an empty object. The context is available inside of process() as well as inside of event callbacks after the processing context is available (ack, fail, ping, dead, etc). A DefaultContext is made available as a convienence for the Record definition.

Adding a Job to the Queue

await queue.enqueue({
  ref: "sample-id",
  /* TData */ payload: {
    success: true,
  },
});

enqueue() API

queue.enqueue(job: JobDefinition<TData> | JobDefinition<TData>[])
  • job the JSON Job object (or an array of job objects), consisting of
    • ref?: string an identifier for the job, allowing future enqueue() calls to replace the job with new data. Defaults to a v4 UUID
    • payload: TData the job's payload which will be saved and sent to the handler
    • runAt?: Date a date object describing when the job should run. Defaults to now()
    • runEvery?: string | null Either a cron interval or an ISO-8601 duration, or null to remove recurrence
    • timezone?: string | null When using runEvery, you can specify a timezone to make DocMQ aware of durations that cross date-modifying thresholds such as Daylight Savings Time; recommended when using cron and duration values outside of UTC.
    • retries?: number a number of tries for this job, defaults to 5
    • retryStrategy?: RetryStrategy a retry strategy, defaults to exponential

Retry Strategies

interface FixedRetryStrategy {
  type: "fixed";
  amount: number;
  jitter?: number;
}

interface ExponentialRetryStrategy {
  type: "exponential";
  min: number;
  max: number;
  factor: number;
  jitter?: number;
}

export interface LinearRetryStrategy {
  type: "linear";
  min: number;
  max: number;
  factor: number;
  jitter?: number;
}

Handling Work (Processing)

queue.process(
  async (job: TData, api: HandlerAPI<TAck, TFail, TContext>) => {
    await api.ack();
  },
  {
    /* options */
  }
);

process() Options

queue.process(handler: JobHandler<T, A, F>, config?: ProcessorConfig<C>)

  • handler the job handler function, taking the job T and the api as arguments, returns a promise
  • config?: ProcessorConfig an optional configuration for the processor including
    • pause?: boolean should the processor wait to be started, default false
    • concurrency?: number the number of concurrent processor loops to run, default 1
    • visibility?: number specify the visibility window (how long a job is held for by default) in seconds, default 30
    • pollInterval?: number as a fallback, define how often to check for new jobs in the event that driver does not support evented notifications. Defaults to 5
    • createContext?: () => Promise<TContext> | TContext generates a unique context of type TContext for this run. It will be available in the handler API.

api Methods and Members

  • api.ref (string) the ref value of the job
  • api.attempt (number) the attempt number for this job
  • api.visible (number) the number of seconds this job was originally reserved for
  • api.context (TContext) the context object, generated for this run
  • api.ack(result: TAck) acknowlegde the job, marking it complete, and scheduling future work
  • api.fail(reason: string | TFail) fail the job and emit the reason, scheduling a retry if required
  • api.ping(extendBy: number) on a long running job, extend the runtime by extendBy seconds

Events

The Queue object has a large number of emitted events available through queue.events. It extends EventEmitter, and the most common events are below. Events related to the processing of a job (ack, fail, dead, and ping) will all receive context: TContext as a second argument to the event callback

  • ack when a job was acked successfully
  • fail when a job was failed
  • dead when a job has exceeded its retries and was moved to the dead letter queue
  • stats an interval ping containing information about the queue's processing load
  • start, stop when the queue starts and stops processing
  • log, warn, error, halt logging events from the queue

Managing Fatal Errors With halt

A fatal event occurs when DocMQ does not believe it can receover from a connection or processing issue. For example, if using the MongoDB driver and the Change Stream disconnects and the process cannot reconnect. To minimize the likelyhood of repeatedly processing jobs in this scenario, DocMQ will call its destroy method and no longer accept jobs for processing. In addition to emitting a standard error event, DocMQ will also emit a halt event. In most environments, it's recommended to kill your service, allowing your PaaS provider to mark your service as unhealthy and restart it.

queue.on("halt", () => {
  console.error("Received HALT from DocMQ");
  process.exit(1); // exit code for termination of node.js
});

🔧 Custom Driver Support

DocMQ works with several drivers, many of which are included in the /drivers directory. For development or non-production scenarios, we recommend the MemoryDriver, an in-memory driver that supports all of DocMQ's apis. When transitioning to production, you can pass a production driver in and DocMQ will work with no additional changes.

import { Queue, MemoryDriver } from "docmq";
import { MongoDriver } from "docmq/driver/mongo";

// for example, using the MongoDriver in production, while using
// the less resource-intensive in-memory driver for development
const driver =
  process.env.NODE_ENV === "production"
    ? new MongoDriver(process.env.MONGO_URI)
    : new MemoryDriver("default");

const queue = new Queue(driver, "queueName");
Driver import Notes
In Memory import { MemoryDriver} from "docmq" The default in-memory driver and (currently) a re-export of LokiDriver
LokiJS import { LokiDriver } from "docmq/driver/loki" A fast in-memory driver designed for non-production instances.
MongoDB import { MongoDriver } from "docmq/driver/mongo" Currently, DocMQ requires a Mongo Client >= 4.2 for transaction support, and the mongo instance must be running in a Replica Set. This is because MongoDriver uses the OpLog to reduce polling. Requires mongodb peer dependency if using
Postgres import { PGDriver } from "docmq/driver/postgres" We are slowly expanding our PG Test Matrix based on what GitHub allows. LISTEN/NOTIFY support is not available, and the driver will fall back to polling. Requires pg as a peer dependency if using

If you need to write a custom driver, the core BaseDriver is available in the core docmq package.

✏️ Contributing

We would love you to contribute to jakobo/docmq, pull requests are welcome! Please see the CONTRIBUTING.md for more information.

⚖️ License

DocMQ source is made available under the MIT license

❤️ Sponsor

This project is made possible thanks to Taskless. The in-memory driver of DocMQ enables Taskless to offer a local version of their service, eliminating the hassle of forwarding webhooks around.

✨ Contributors

All Contributors

Thanks goes to these wonderful people (emoji key):


Jakob Heuser

💻 📖 ⚠️

takanome_dev

📖

This project follows the all-contributors specification. Contributions of any kind welcome!