/persipubsub

TODO

Primary LanguagePythonMIT LicenseMIT

REPOSITORY ARCHIVED ON June 2nd, 2020

persipubsub

Build Status Coverage Documentation Status PyPI - version

PyPI - Python Version

MIT License

persipubsub implements a persistent, thread-safe and process-safe queue for inter-process communication, based on lmdb.

Primarily, we used zeromq for inter-process communication with a slight improvement through persizmq. This still did not fulfill the level of persistence we wanted.

Our motivation was to replace our previous library with a one which is similarly easy to setup and to use. Additionally, it should make it possible to send protobuf messages (bytes) thread-safely and process-safely from many publishers to many subscribers.

Besides basic publisher and subscriber classes the library offers control methods for easy deployment from a config JSON file and maintenance in case needed.

Related projects

persist-queue

  • The library offers not all functionality expected from a queue. It has put and get function which are basically only push and pop. Therefore front functionality is missing. In consequence neither can the queue have multiple subscribers nor can be guaranteed that no data is lost when a thread fails.
  • All messages in queues are serialized by pickle which was for us the main reason not to use this library.

Kafka

  • Hence we only need Inter Process Communication, the TCP ability of Kafka is an unnecessary overhead.
  • Integration of Kafka written in Scala and Java in our C++/Python/Go codebase is challenging.
  • Hard to setup and maintain Kafka.
  • broker system eventually a bottleneck.

RabbitMQ

  • Hence we only need Inter Process Communication, the TCP ability of RabbitMQ is an unnecessary overhead.
  • broker system eventually a bottleneck.
  • RabbitMQ is less scalable than Kafka, but is supported officially by more languages.

zeromq persistence pattern

  • Titanic is the only persistence pattern of zeromq which is also a broker system. This takes away the purpose and advantage of zeromq to be a lightweight library which requires no broker.

Usage

The usage of the library consists of two steps: deployment and runtime

Python

Environment

To improve the accessibility of the library, an environment class lets you create and initialize any persipubsub component which you need in deployment or runtime step.

Warning

Only one environment of each queue per process allowed and it's forbidden to fork environment or any child components to multiple processes. In that case, persipubsub is multi-threading and multi-process safe. If multiple environments of the same queue are active on the same process, or environment is forked to multiple processes the lock is broken and correctness can't be guaranteed.

Initialize environment

import persipubsub.environment

env = persipubsub.environment.new_environment(path="/home/user/queue/")

Deployment

In the deployment stage the library sets up the queue structure with the control.

Control

A control unit to initialize and maintain queues.

Note

The high water marks are limits for the queue. The message is deleted in case that it reaches the timeout. In the other case of an overflow one of two strategies is used to prune half of the queue. The choice is between prune_first, which deletes the oldest messages, and prune_last, which deletes the latest messages.

Initialize queue
import persipubsub.environment
import persipubsub.queue

env = persipubsub.environment.new_environment(path="/home/user/new-queue/")

# Initialize a queue with default values.
control = env.new_control()
# Or define all optional parameters of the queue.
hwm = persipubsub.queue._HighWaterMark()
strategy = persipubsub.queue._Strategy.prune_first
control = env.new_control(subscriber_ids=["sub1", "sub2"],
                          high_watermark=hwm,
                          strategy=strategy)
Prune all dangling messages
import persipubsub.environment

env = persipubsub.environment.new_environment(
    path="/home/user/queue-with-dangling-messages/")
control = env.new_control()

control.prune_dangling_messages()
Clear all messages
import persipubsub.environment

env = persipubsub.environment.new_environment(
    path="/home/user/queue-with-subscribers-and-messages/")
control = env.new_control()

control.clear_all_subscribers()

Runtime

During runtime only publisher and subscriber are needed.

Note

Control can be optionally be used for pruning although the queues prune itself on a regular basis when high water mark is reached. The high water mark includes a timeout, maximum number of messages and the maximum bytes size of the queue.

Publisher

Initialization

Assuming that all queues were initialized during deployment the publisher can be initialized as following.

import persipubsub.environment

env = persipubsub.environment.new_environment(path="/home/user/queue/")

pub = env.new_publisher()
Send a message
msg = "Hello there!".encode('utf-8')
pub.send(msg=msg)

# Subscribers have now a message in the queue.
Send many messages at once
msgs = ["What's up?".encode('utf-8'),
        "Do you like the README?".encode('utf-8')]
pub.send_many(msgs=msgs)

# Both messages are now available for the subscribers. Note that the order
# of the messages are not necessarily kept.

Subscriber

Initialization

Assuming that all queues were initialized during deployment the subscriber can be initialized as following.

import persipubsub.environment

env = persipubsub.environment.new_environment(path="/home/user/queue/")

sub = env.new_subscriber(identifier="sub")
Receive a message
# One message in queue
with sub.receive() as msg:
    # do something with the message
    print(msg)  # b'Hello there!'

# This subscriber's queue is now empty
Catch up with latest message

Can be used in the case when a particular subscriber cares only about the very last message. The messages are not popped for other subscribers.

Note

If you want to store only the latest message for all subscribers, then use high water mark max_msgs_num = 1.

# Many outdated messages in queue

with sub.receive_to_top() as msg:
    # do something with the latest message

# This subscriber's queue is now empty.

Documentation

The documentation is available on readthedocs.

Installation

  • Install persipubsub with pip:
pip3 install persipubsub

Development

  • Check out the repository.
  • In the repository root, create the virtual environment:
python3 -m venv venv3
  • Activate the virtual environment:
source venv3/bin/activate
  • Install the development dependencies:
pip3 install -e .[dev]

We use tox for testing and packaging the distribution. Assuming that the virtual environment has been activated and the development dependencies have been installed, run:

tox

Pre-commit Checks

We provide a set of pre-commit checks that lint and check code for formatting.

Namely, we use:

  • yapf to check the formatting.
  • The style of the docstrings is checked with pydocstyle.
  • Static type analysis is performed with mypy.
  • isort to sort your imports for you.
  • Various linter checks are done with pylint.
  • Doctests are executed using the Python doctest module.
  • pyicontract-lint lints contracts in Python code defined with icontract library.
  • twine to check the README for invalid markup which prevents it from rendering correctly on PyPI.

Run the pre-commit checks locally from an activated virtual environment with development dependencies:

./precommit.py
  • The pre-commit script can also automatically format the code:
./precommit.py  --overwrite

Versioning

We follow Semantic Versioning. The version X.Y.Z indicates:

  • X is the major version (backward-incompatible),
  • Y is the minor version (backward-compatible), and
  • Z is the patch version (backward-compatible bug fix).