persist-queue
implements a file-based queue and a serial of sqlite3-based queues. The goals is to achieve following requirements:
- Disk-based: each queued item should be stored in disk in case of any crash.
- Thread-safe: can be used by multi-threaded producers and multi-threaded consumers.
- Recoverable: Items can be read after process restart.
- Green-compatible: can be used in
greenlet
oreventlet
environment.
While queuelib and python-pqueue cannot fulfil all of above. After some try, I found it's hard to achieve based on their current implementation without huge code change. this is the motivation to start this project.
By default, persist-queue use pickle object serialization module to support object instances. Most built-in type, like int, dict, list are able to be persisted by persist-queue directly, to support customized objects, please refer to Pickling and unpickling extension types(Python2) and Pickling Class Instances(Python3)
This project is based on the achievements of python-pqueue and queuelib
- Python 2.7 or Python 3.x.
- Full support for Linux.
- Windows support (with Caution if
persistqueue.Queue
is used).
- Multiple platforms support: Linux, macOS, Windows
- Pure python
- Both filed based queues and sqlite3 based queues are supported
- Filed based queue: multiple serialization protocol support: pickle(default), msgpack, json
pip install persist-queue
# for msgpack support, use following command
pip install persist-queue[EXTRA]
git clone https://github.com/peter-wangxu/persist-queue
cd persist-queue
python setup.py install
Here are the results for writing/reading 1000 items to the disk comparing the sqlite3 and file queue.
- Windows
- OS: Windows 10
- Disk: SATA3 SSD
- RAM: 16 GiB
Write | Write/Read(1 task_done) | Write/Read(many task_done) | |
SQLite3 Queue | 1.8880 | 2.0290 | 3.5940 |
File Queue | 4.9520 | 5.0560 | 8.4900 |
windows note Performance of Windows File Queue has dramatic improvement since v0.4.1 due to the atomic renaming support(3-4X faster)
- Linux
- OS: Ubuntu 16.04 (VM)
- Disk: SATA3 SSD
- RAM: 4 GiB
Write | Write/Read(1 task_done) | Write/Read(many task_done) | |
SQLite3 Queue | 1.8282 | 1.8075 | 2.8639 |
File Queue | 0.9123 | 1.0411 | 2.5104 |
- Mac OS
- OS: 10.14 (macOS Mojave)
- Disk: PCIe SSD
- RAM: 16 GiB
Write | Write/Read(1 task_done) | Write/Read(many task_done) | |
SQLite3 Queue | 0.1879 | 0.2115 | 0.3147 |
File Queue | 0.5158 | 0.5357 | 1.0446 |
note
- The value above is seconds for reading/writing 1000 items, the less the better.
- Above result was got from:
python benchmark/run_benchmark.py 1000
To see the real performance on your host, run the script under benchmark/run_benchmark.py
:
python benchmark/run_benchmark.py <COUNT, default to 100>
>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.put('str1')
>>> q.put('str2')
>>> q.put('str3')
>>> q.get()
'str1'
>>> del q
Close the console, and then recreate the queue:
>>> import persistqueue
>>> q = persistqueue.SQLiteQueue('mypath', auto_commit=True)
>>> q.get()
'str2'
>>>
This queue does not allow duplicate items.
>>> import persistqueue
>>> q = persistqueue.UniqueQ('mypath')
>>> q.put('str1')
>>> q.put('str1')
>>> q.size
1
>>> q.put('str2')
>>> q.size
2
>>>
The core functions:
get
: get from queue and mark item as unackack
: mark item as ackednack
: there might be something wrong with current consumer, so mark item as ready and new consumer will get itack_failed
: there might be something wrong during process, so just mark item as failed.
>>> import persisitqueue
>>> ackq = persistqueue.SQLiteAckQueue('path')
>>> ackq.put('str1')
>>> item = ackq.get()
>>> # Do something with the item
>>> ackq.ack(item) # If done with the item
>>> ackq.nack(item) # Else mark item as `nack` so that it can be proceeded again by any worker
>>> ackq.ack_failed() # Or else mark item as `ack_failed` to discard this item
Note: this queue does not support auto_commit=True
>>> from persistqueue import Queue
>>> q = Queue("mypath")
>>> q.put('a')
>>> q.put('b')
>>> q.put('c')
>>> q.get()
'a'
>>> q.task_done()
Close the python console, and then we restart the queue from the same path,
>>> from persistqueue import Queue
>>> q = Queue('mypath')
>>> q.get()
'b'
>>> q.task_done()
>>> from persisitqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key1'] = 123
>>> q['key2'] = 321
>>> q['key1']
123
>>> len(q)
2
>>> del q['key1']
>>> q['key1']
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "persistqueue\pdict.py", line 58, in __getitem__
raise KeyError('Key: {} not exists.'.format(item))
KeyError: 'Key: key1 not exists.'
Close the console and restart the PDict
>>> from persisitqueue import PDict
>>> q = PDict("testpath", "testname")
>>> q['key2']
321
from persistqueue import FIFOSQLiteQueue
q = FIFOSQLiteQueue(path="./test", multithreading=True)
def worker():
while True:
item = q.get()
do_work(item)
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
from persistqueue import Queue
q = Queue()
def worker():
while True:
item = q.get()
do_work(item)
q.task_done()
for i in range(num_worker_threads):
t = Thread(target=worker)
t.daemon = True
t.start()
for item in source():
q.put(item)
q.join() # block until all tasks are done
Currently only available for file based Queue
>>> from persistqueue
>>> q = persistqueue.Queue('mypath', persistqueue.serializers.msgpack)
>>> # via json
>>> # q = Queue('mypath', persistqueue.serializers.json)
>>> q.get()
'b'
>>> q.task_done()
task_done
is required both for filed based queue and SQLite3 based queue (when auto_commit=False
)
to persist the cursor of next get
to the disk.
WAL
Starting on v0.3.2, the
persistqueue
is leveraging the sqlite3 builtin featureWAL <https://www.sqlite.org/wal.html>
which can improve the performance significantly, a general testing indicates thatpersistqueue
is 2-4 times faster than previous version.auto_commit=False
Since persistqueue v0.3.0, a new parameter
auto_commit
is introduced to tweak the performance for sqlite3 based queues as needed. When specifyauto_commit=False
, user needs to performqueue.task_done()
to persist the changes made to the disk since lasttask_done
invocation.pickle protocol selection
From v0.3.6, the
persistqueue
will selectProtocol version 2
for python2 andProtocol version 4
for python3 respectively. This selection only happens when the directory is not present when initializing the queue.
persist-queue use tox
to trigger tests.
- Unit test
tox -e <PYTHON_VERSION>
Available <PYTHON_VERSION>
: py27
, py34
, py35
, py36
, py37
- PEP8 check
tox -e pep8
pyenv is usually a helpful tool to manage multiple versions of Python.
Currently, the atomic operation is supported on Windows while still in experimental,
That's saying, the data in persistqueue.Queue
could be in unreadable state when an incidental failure occurs during Queue.task_done
.
DO NOT put any critical data on persistqueue.queue on Windows.
Simply fork this repo and send PR for your code change(also tests to cover your change), remember to give a title and description of your PR. I am willing to enhance this project with you :).
sqlite3.OperationalError: database is locked
is raised.
persistqueue open 2 connections for the db if multithreading=True
, the
SQLite database is locked until that transaction is committed. The timeout
parameter specifies how long the connection should wait for the lock to go away
until raising an exception. Default time is 10, increase timeout
when creating the queue if above error occurs.
- sqlite3 based queues are not thread-safe.
The sqlite3 queues are heavily tested under multi-threading environment, if you find it's not thread-safe, please
make sure you set the multithreading=True
when initializing the queue before submitting new issue:).