/mongo_queue

An extensible thread safe job/message queueing system that uses mongodb as the persistent storage engine.

Primary LanguageRubyMIT LicenseMIT

Mongo::Queue

An extensible thread safe job/message queueing system that uses mongodb as the persistent storage engine.

Features

  • Fully Configurable

  • Distributed

  • Atomic Locking

  • Thread Safe

  • Priority Support

  • Worker Timeout Support

  • Fully Tested

  • Production Ready

  • Simple API

Examples

Setting up a queue

When instantiating the MonogoQueue, there are several options which can be used to configure how it will act. You can configure everything from where the data is stored, to the amount of time which a message can be locked, to the number of times the job will be attempted, and more. Below are the default configuration options which can be adjusted to your taste.

db = Mongo::Connection.new

options = {
  :database   => 'mongo_queue', 
  :collection => 'mongo_queue',
  :timeout    => 300, # number of seconds that an item will remain locked
  :attempts   => 3   # number of times attempts to lock and complete the item before ignoring
}
queue = Mongo::Queue.new(db, options)

Inserting Jobs

A job is inserted as a hash in to the document store, but you will want to avoid using the protected keys which enable Mongo::Queue to operate effectively as a queue system. Avoid using the following fields in your queue messages;

  • locked_at

  • locked_by

  • attempts

  • timeout

  • time

  • priority

  • last_error

    job = {
      :site     => 'http://example.com/',
      :foo      => 'More Stuff',
      :priority => 10
    }
    queue.insert(job)
    

Simple Job Worker Example

Generally when using a message queue, you will want to iterate over job items and continue to do so until all jobs have been completed. This can be completed within a loop, and even be threaded easily. You will want to make sure that you generate a unique string which you can reference the worker with throughout it’s lifetime.

require 'digest/md5'
require 'rubygems'
require 'mongo'
require 'mongo_queue'

db = Mongo::Connection.new('localhost')
queue = Mongo::Queue.new(db)

process_id = Digest::MD5.hexdigest("#{Socket.gethostname}-#{Process.pid}-#{Thread.current}")

queue.cleanup! # remove expired locks

while(doc = queue.lock_next(process_id))
  begin
    do_some_work(doc)
    queue.complete(doc, process_id)
    sleep(1)
  rescue StandardError => e
    queue.error(doc, e.message)
  end
end

Queue Status

There is an additional stat method available on the queue object which provides a hash with some information that may be useful when reviewing the current contents of the queue.

queue.stat
# => {:locked => 34, :available => 34536, :errors => 0, :total => 34570}

Copyright © 2010 Josh Martin. See LICENSE for details.