REPOSITORY ARCHIVED ON June 2nd, 2020
persipubsub
persipubsub
implements a persistent, thread-safe and process-safe queue for
inter-process communication, based on lmdb.
Primarily, we used zeromq for inter-process communication with a slight improvement through persizmq. This still did not fulfill the level of persistence we wanted.
Our motivation was to replace our previous library with a one which is similarly easy to setup and to use. Additionally, it should make it possible to send protobuf messages (bytes) thread-safely and process-safely from many publishers to many subscribers.
Besides basic publisher and subscriber classes the library offers control methods for easy deployment from a config JSON file and maintenance in case needed.
Related projects
persist-queue
- The library offers not all functionality expected from a queue. It has put
and get function which are basically only push and pop. Therefore
front
functionality is missing. In consequence neither can the queue have multiple subscribers nor can be guaranteed that no data is lost when a thread fails. - All messages in queues are serialized by
pickle
which was for us the main reason not to use this library.
Kafka
- Hence we only need Inter Process Communication, the TCP ability of Kafka is an unnecessary overhead.
- Integration of
Kafka
written in Scala and Java in our C++/Python/Go codebase is challenging. - Hard to setup and maintain
Kafka
. - broker system eventually a bottleneck.
RabbitMQ
- Hence we only need Inter Process Communication, the TCP ability of RabbitMQ is an unnecessary overhead.
- broker system eventually a bottleneck.
RabbitMQ
is less scalable thanKafka
, but is supported officially by more languages.
zeromq persistence pattern
- Titanic is the only persistence
pattern of zeromq which is also a broker system.
This takes away the purpose and advantage of
zeromq
to be a lightweight library which requires no broker.
Usage
The usage of the library consists of two steps: deployment and runtime
Python
Environment
To improve the accessibility of the library, an environment class lets you
create and initialize any persipubsub
component which you need in
deployment or runtime step.
Warning
Only one environment of each queue per process allowed and it's forbidden to fork environment or any child components to multiple processes. In that case, persipubsub is multi-threading and multi-process safe. If multiple environments of the same queue are active on the same process, or environment is forked to multiple processes the lock is broken and correctness can't be guaranteed.
Initialize environment
import persipubsub.environment
env = persipubsub.environment.new_environment(path="/home/user/queue/")
Deployment
In the deployment stage the library sets up the queue structure with the control.
Control
A control unit to initialize and maintain queues.
Note
The high water marks are limits for the queue. The message is deleted in case that it reaches the timeout. In the other case of an overflow one of two strategies is used to prune half of the queue. The choice is between prune_first, which deletes the oldest messages, and prune_last, which deletes the latest messages.
Initialize queue
import persipubsub.environment
import persipubsub.queue
env = persipubsub.environment.new_environment(path="/home/user/new-queue/")
# Initialize a queue with default values.
control = env.new_control()
# Or define all optional parameters of the queue.
hwm = persipubsub.queue._HighWaterMark()
strategy = persipubsub.queue._Strategy.prune_first
control = env.new_control(subscriber_ids=["sub1", "sub2"],
high_watermark=hwm,
strategy=strategy)
Prune all dangling messages
import persipubsub.environment
env = persipubsub.environment.new_environment(
path="/home/user/queue-with-dangling-messages/")
control = env.new_control()
control.prune_dangling_messages()
Clear all messages
import persipubsub.environment
env = persipubsub.environment.new_environment(
path="/home/user/queue-with-subscribers-and-messages/")
control = env.new_control()
control.clear_all_subscribers()
Runtime
During runtime only publisher and subscriber are needed.
Note
Control can be optionally be used for pruning although the queues prune itself on a regular basis when high water mark is reached. The high water mark includes a timeout, maximum number of messages and the maximum bytes size of the queue.
Publisher
Initialization
Assuming that all queues were initialized during deployment the publisher can be initialized as following.
import persipubsub.environment
env = persipubsub.environment.new_environment(path="/home/user/queue/")
pub = env.new_publisher()
Send a message
msg = "Hello there!".encode('utf-8')
pub.send(msg=msg)
# Subscribers have now a message in the queue.
Send many messages at once
msgs = ["What's up?".encode('utf-8'),
"Do you like the README?".encode('utf-8')]
pub.send_many(msgs=msgs)
# Both messages are now available for the subscribers. Note that the order
# of the messages are not necessarily kept.
Subscriber
Initialization
Assuming that all queues were initialized during deployment the subscriber can be initialized as following.
import persipubsub.environment
env = persipubsub.environment.new_environment(path="/home/user/queue/")
sub = env.new_subscriber(identifier="sub")
Receive a message
# One message in queue
with sub.receive() as msg:
# do something with the message
print(msg) # b'Hello there!'
# This subscriber's queue is now empty
Catch up with latest message
Can be used in the case when a particular subscriber cares only about the very last message. The messages are not popped for other subscribers.
Note
If you want to store only the latest message for all subscribers, then use high water mark max_msgs_num = 1.
# Many outdated messages in queue
with sub.receive_to_top() as msg:
# do something with the latest message
# This subscriber's queue is now empty.
Documentation
The documentation is available on readthedocs.
Installation
- Install persipubsub with pip:
pip3 install persipubsub
Development
- Check out the repository.
- In the repository root, create the virtual environment:
python3 -m venv venv3
- Activate the virtual environment:
source venv3/bin/activate
- Install the development dependencies:
pip3 install -e .[dev]
We use tox for testing and packaging the distribution. Assuming that the virtual environment has been activated and the development dependencies have been installed, run:
tox
Pre-commit Checks
We provide a set of pre-commit checks that lint and check code for formatting.
Namely, we use:
- yapf to check the formatting.
- The style of the docstrings is checked with pydocstyle.
- Static type analysis is performed with mypy.
- isort to sort your imports for you.
- Various linter checks are done with pylint.
- Doctests are executed using the Python doctest module.
- pyicontract-lint lints contracts in Python code defined with icontract library.
- twine to check the README for invalid markup which prevents it from rendering correctly on PyPI.
Run the pre-commit checks locally from an activated virtual environment with development dependencies:
./precommit.py
- The pre-commit script can also automatically format the code:
./precommit.py --overwrite
Versioning
We follow Semantic Versioning. The version X.Y.Z indicates:
- X is the major version (backward-incompatible),
- Y is the minor version (backward-compatible), and
- Z is the patch version (backward-compatible bug fix).