/persist-queue

A thread-safe disk based persistent queue in Python

Primary LanguagePythonApache License 2.0Apache-2.0

persist-queue - A thread-safe, disk-based queue for Python

https://img.shields.io/circleci/project/github/peter-wangxu/persist-queue.svg?label=Linux%20%26%20Mac https://img.shields.io/appveyor/ci/peter-wangxu/persist-queue.svg?label=Windows

This project is based on the achievements of python-pqueue and queuelib

The goals is to achieve following requirements:

  • Disk-based: each queued item should be stored in disk in case of any crash.
  • Thread-safe: can be used by multi-threaded producers and multi-threaded consumers.
  • Recoverable: Items can be read after process restart.
  • Green-compatible: It can be used in greenlet or eventlet environment.

While queuelib and python-pqueue cannot fulfil all of above. After some try, I found it's hard to achieve based on their current implementation without huge code change. this is the motivation to start this project.

Besides, persist-queue can serialize any object instances supported by python pickle object serialization module. To support customized objects, please refer to Pickling and unpickling extension types(Python2) and Pickling Class Instances(Python3)

Requirements

  • Python 2.7 or Python 3.x.
  • Fully support for Linux and Windows.

Installation

from pypi

pip install persist-queue

from source code

git clone https://github.com/peter-wangxu/persist-queue
cd persist-queue
python setup.py install

Examples

>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.put('c')
>>> q.get()
'a'
>>> q.task_done()

Close the python console, and then we restart the queue from the same path,

>>> from persistqueue import Queue
>>> q = Queue('mypath')
>>> q.get()
'b'
>>> q.task_done()

example usage with multi-thread(referred from github project python-pqueue):

from persistqueue import Queue

q = Queue()

def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

Tests

persist-queue use tox to trigger tests.

to trigger tests based on python2.7/python3.4/python3.5, use:

tox -e py27
tox -e py34
tox -e py35

to trigger pep8 check, use:

tox -e pep8

Contribution

Simply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to enhance this project with you :).

License

Apache License Version 2.0

FAQ