A light weight, simple FIFO Queue backed by PostgresSQL for delayed tasks.
- Long scheduling jobs in future is hard to achieve using some of the most reliable distributed queues e.g, SQS with an upper limit of 12 hours visibility timeout. For long scheduled tasks message needs to be picked up and delayed again depending on how long in the future a job is scheduled
- Redis pub/sub model with delay time suits really well for scheduling jobs. But the problem is it is not backed by disk. It works really well for the immediate jobs where you need high throughput, but to keep jobs that has to run way in the future consumes lots of memory and it is costly
- While Redis cluster and cloud provided distributed queues elegantly solves scaling challenges, sometimes use-case is to track these jobs in one place for simplicity, like SQL/NoSQL store without too much effort in a common schema model. We attempt to track job states in Postgres through Programma
- Programma goal is not to implement a solid Job Worker logic. You could use it as your end-to-end job processor for simpler low latency tasks, but we recommend to use Redis, RabbitMQ or a queue like SQS for fanning out or distributing the workload to them through Programma
- The goal of Programma is to expose a very flexible and simple API. Where client could nudge the job processing lifecycle by calling utility methods without us dictating specific lifecycle of a job
npm install programma
import { Programma } from 'programma'
const programma = new Programma({
connectionString: '<database-connection-string>',
})
// connect to node-pq pool and start processing
await programma.start()
await programma.addJob('sendEmail', {
data: { email: 'xyz@gmail.com'},
runAfterDate: '2020-08-27T20:02:18.255Z', // optional UTC ISO-8601 format, run after a specific date takes precedence
})
programma.processJobs({ topicName: 'sendEmail', heartBeat: 10 }, async (job: IReceiveJob) => {
await programma.moveJobToProcessing(job.id)
})
- Lightweight: ~300 lines of code
- Handles schema creation for you
- Programma API is highly customizable. It's up-to you how you want to handle job life-cycle or retry logic
- Guarantee delivery of exactly to only one processor within the retry timeout period, without too much locking or effecting performance. Thanks to Postgres SKIP LOCK feature
- Programma ensures a job is delivered and claimed by the processor with retryAfterSeconds logic until job status is changed. This parameter is customizable and you can use it for exponential backoff logic as well by changing the retryAfterSeconds
- Received messages that are not changed to either Processing, Completed or FAILED state will appear again after retryAfterSecond timeout. Default is 30seconds
- Promise based API and written in typescript
Programma for us makes more sense to schedule task/work to be done, and to track it's progress and state or to fan-out multiple tasks to a queue system. You can parallelize Programma by running on multiple different Node processes giving maxJobs limit per heartbeat/interval where you can achieve horizontal scalability. Programma also handles schema/table creation so it could be used with a multiple PostgresDB to run the jobs. For parallelizing and doing the work you could use Redis based queue system like bull or rsmq using a shared Redis server multiple Node.js processes can send / receive messages Following is an example to use it with awesome bull queue
const bullRedisQueue = new Queue('sendEmail', 'redis://127.0.0.1:6379');
await programma.addJob('sendEmail', {
data: { email: 'xyz@gmail.com'},
runAfterDate: '2020-08-27T20:02:18.255Z', // optional UTC ISO-8601 format, run after a specific date takes precedence
retryAfterSeconds: 60,
})
programma.processJobs({ topicName: 'sendEmail', heartBeat: 10 }, async (job: IReceiveJob) => {
// use all the good things provided by bull to retry the work to be done and everything
await bullRedisQueue.add(
{ id: job.id, data: job.data, attributes: job.data },
{ retries: 3, backoff: 20000,timeout: 15000 }
)
// move job to processing, after submitting it. the status will be changed to processing and Redis queue will handle it
// if a job is not moved to different state it will be retired after retryAfterSecond period
await programma.moveJobToProcessing(job.id)
})
bullRedisQueue.process(async (job, done) => {
// heavy processing
await programma.moveJobToDone(jod.data.id)
return Promise.resolve()
})
// job failed after all the back-off retries
// we can track that in SQL
bullRedisQueue.on('failed', (id, error) => {
const job = await bullRedisQueue.getJob(id)
await programma.moveJobToFailed(jod.data.id)
// also can track error in SQL
await programma.setAttributes(job.id, { error: error })
})
For tasks that does not involve heavy computation (i.e, does not block event-loop or not to put a lot of back-pressure) programma could be used as a end-to-end solution or alternatively for high latency tasks it could be used with smaller batch sizes per interval. where work could be parallelized between multiple nodes that can poll the same topic
await programma.addJob('sendPushNotification', {
data: { token: '123213123' },
retryAfterSeconds: 60,
attributes: { maxAttempts: 5 },
retryAfterSeconds: 5,
})
// you can control the re-try logic and could also use it for exponential back-off
// it's all up-to you how to handle it
programma.processJobs({ topicName: 'sendPushNotification', heartBeat: 10, maxJobs: 100 }, async (job: IReceiveJob) => {
// check if retry attempts are exhausted
if (job.attributes.attempts > job.attributes.maxAttempts) {
await programma.moveJobToFailed(job.id)
}
try {
await pushNotifyApi(job.data)
await programma.moveJobToDone(job.id)
} catch (e) {
// set the retry counter
await programma.setAttributes(job.id, { attempts: attempts ? attempts + 1 : 1 })
// use this for constant retry delay, exponential back-off or a combination thereof
await programma.setRetryAfterSeconds(60)
}
})
For the config programma uses node-postgres pool config to create an internal pool of connection. You can refer to PoolConfig interface
export interface IProgrammaConstructor {
new (config: PoolConfig, schemaName: string): IProgramma
}
// schemaName defaults to programma. you can also give your custom schema name
const program = new Programma({
connectionString: `postgres://<user-name>:<password>@localhost:5432/postgres`,
max: 50, // max connection pool size. default to 10
}, 'mySchemaName')
await program.start() // this established the pool connection, ensures migration and start pooling. if start with false it won't ensure migration
export interface IProgramma {
addJob(topicName: string, job: IJobConfig): Promise<string | null>
receiveJobs(config: IReceiveMessageConfig, handler: IHandlerCallback): void
deleteJob(id: string): Promise<boolean>
moveJobToProcessing(id: string): Promise<boolean>
moveJobToDone(id: string, deleteOnComplete: boolean): Promise<boolean>
moveJobToFailed(id: string, deleteOnFail: boolean): Promise<boolean>
getJob(id: string): Promise<IJobDetail | null>
setAttributes(id: string, attributes: object): Promise<boolean>
setRetryAfterSeconds(id: string, seconds: number): Promise<boolean>
setJobStartDate(id: string, startDate: string | Date): Promise<boolean>
start(): Promise<void>
shutdown(): void
}
export interface IJobConfig {
// job data is persisted in JSONB type
// it could contain any number of key value pairs
data: {}
// attributes is metadata related to job, persisted as JSONB type
// attributes could be like retry count etc. metadata for downstream system
// job calls will expose a method to run these attributes for downstream system
attributes?: {}
// run after seconds
runAfterSeconds?: number
// ISO8601 format String or Date. Use UTC ISO8601 format to avoid inconsistency
runAfterDate?: string | Date
// re-run the job after seconds. default is 30seconds
retryAfterSeconds?: number | null
}
if runAfterDate or runAfterSeconds is not provided currentTime will be picked by default so job could be run in next pooling interval Job uuid will be returning on success
export interface IReceiveMessageConfig {
// maximum number of messages polled per scheduled interval
// even though the handler will be executed independently per job
maxJobs?: number
// poll interval in seconds. has to be greater of equal to 1
// default poll interval is 5seconds
heartBeat?: number
// name of the queue topic. this is required field
topicName: string
}
export interface IHandlerCallback {
(job: IReceiveJob): void
}
export interface IReceiveJob {
id: string
data: Object
attributes: Object
}
export interface IJobDetail {
id: string
topicName: string
data: Object
attributes: Object
state: JobStates
start_after: string
started_at: string | null
created_at: string
retry_after_seconds: number
}
All the change status methods follows very self descriptive interface. For moveJobToDone, moveJobToFailed if a seconds boolean parameter true is passed it will delete the job from table. Following are the job states
export enum JobStates {
CREATED = 'created',
ACTIVE = 'active',
PROCESSING = 'processing',
FAILED = 'failed',
COMPLETED = 'completed',
}
Message will initially be in created state. It will be moved to ACTIVE state at the right time for message to be run. After that message will remain in ACTIVE state and will be retried after the retry_after_seconds if the message is not moved to a different state from ACTIVE.
create table if not exists ${this.schemaName}.jobs (
id uuid primary key not null default gen_random_uuid(),
topicName text not null,
data jsonb,
attributes jsonb,
state varchar(255) not null default('created'),
start_after timestamp with time zone not null default now(),
started_at timestamp with time zone,
created_at timestamp with time zone default now(),
retry_after_seconds int
);
- Create API for queue/job metrics
- TTL (Time to Live) for failed/completed job to be cleaned up configured per topic
Author: hadi@vincere.health
Created at Vincere since we had a very similar use-case. Our initial design inspiration came from this blog
Also we were inspired by Pinterest Pin Later although it's fundamentally different in implementation
We are really open to suggestions and would love to hear your feedback and job scheduling use-cases so that we can improve