
Message Queue Primitives

Primary LanguagePython


Message Queue Primitives. CQueue allows you to start the server in the same script as your worker, removing the need for you to deploy a server before hand.

The cockroachdb backend even allows you to create a highly resilient queue that can survive multiple server failures.


pip install git+git://github.com/Delaunay/cqueue

Simple Workflow

from msgqueue import new_client

client = new_client(
    namespace='task',       # Queue Namespace
    log_capture=True,       # Capture Worker Log and Stdout
    timeout=60)             # Client Time Out

# remove a message from the `work` queue
message = client.pop('work')

result = processing(message)

# Message has finished being processed

# put a message in the `result` queue
client.push('result', result)

Start servers at will

from msgqueue import new_server

# start a new cockroach server for clients
# to push and pull their messages
server = new_server(uri='cockroach://')

Detect errors & Inspect logs

from msgqueue import new_monitor

monitor = new_monitor(uri='cockroach://')

# get messages that were assigned to a currently dead worker & that have not finished
dead_messages = monitor.lost_messages('task')

# Requeue all messages that did not finish

# get unresponsive worker entries
dead_workers = monitor.dead_agents('task')

# Get logs of the dead worker
log = monitor.log('task', dead_workers[0])


# Get all messages (read & unread) from the `task` namespace and the `work` queue
# For analysis
messages = monitor.messages('task', 'work')
for msg in messages:
        m.read,             # Was the message read
        m.read_time,        # when was the message read
        m.actioned,         # Was the message actioned
        n.actioned_time,    # when was the message actioned
        m.message,          # User provided data (json)
        m.retry,            # Number of time it was retried
        m.error             # Error if any


For mongodb:

sudo apt-get install mongodb-server