/mkue

MongoDB-based job queuing system

Primary LanguageJavaScriptMIT LicenseMIT

mkue

NPM version Build status Test coverage Dependency Status License Downloads Gittip

A MongoDB-backed job queueing mechanism.

  • Concurrency handling
  • Throttling inputs
  • Persistence of all input/output
  • FIFO
  • Exits the process gracefully

Example

Dispatcher:

var Queue = require('mkue');

var queue = new Queue();
// set a db
queue.collection = db.collection('stuff');
// make sure indexes are set
queue.ensureIndexes();

queue.dispatch('this function', {
  these: 'inputs'
})

Worker:

var Queue = require('mkue');

var queue = new Queue();
// set a db
queue.collection = db.collection('stuff');
// make sure indexes are set
queue.ensureIndexes();

// define a namespaced function
queue.define('this function', function (options) {
  return new Promise(function (resolve) {
    resolve(options.these);
  });
});

// set the concurrency
queue.concurrency(5);

// start listening
queue.run();

API

var queue = new Queue([options])

The options are:

  • concurrency <1> - number of jobs to be processed in parallel in this process
  • delay <1000> - delay to query the next batch of jobs on drain
  • collection - the MongoDB collection for this queue

queue.collection =

You are required to set the collection for this worker queue manually.

queue.concurrency(count )

Set the maximum number of concurrent, local jobs.

queue.delay(ms | )

Set the delay after draining the queue to start looking for jobs again.

queue.ensureIndexes().then( => )

Set the indexes for queues and currently processing jobs. Assumes that the queue is always short.

queue.processing().then( count => )

Get the current number of jobs being processed.

queue.queued().then( count => )

Get the current number of jobs in the queue.

queue.queue([ms | ])

Waits ms to start a new job.

queue.dispatch([name ], fn ).then( job => )

Add a job to the queue.

queue.get([name ], options ).then( job => )

Get the latest job with name and options. May or may not be completed yet.

queue.getById().then( job => )

Get a job by its ID.

queue.poll([name ], options , [ms | ]).then( job => )

Poll the latest job at interval ms with name and options until it's complete.

queue.define([name ], fn )

Define a function. name defaults to 'default' if not set. fn's API should be:

fn([options]).then( result => )

You only need to define this on a worker process.

queue.run()

Start running a new job. Call this on a worker process.

queue.close()

Stop creating new jobs.