/rabbitmq_client

Client for a RabbitMQ server.

Primary LanguagePythonMIT LicenseMIT

RabbitMQ client helpers based on pika

PyPI version Build

This project provides helper classes for using RabbitMQ in Python. It is based on pika, which is an awesome no-dependency client library for RabbitMQ. Similarly, this project strives for zero dependencies (except for dev dependencies).

By using this project, users should be able to get started with RabbitMQ in Python instantly, by simply instantiating and starting a RMQConsumer or RMQProducer class.

Consumer

RMQConsumer extends the RMQConnection base class with only one extra method: consume. Consume can be passed parameters for declaring queues and exchanges, as well as binding them together, and consume parameters, all of which have corresponding kwargs in the pika library. The idea is not to re-invent the wheel, but simply the process of declaring a queue -> declaring an exchange -> binding the exchange and queue together -> consuming from the queue.

Here is an example:

from rabbitmq_client import RMQConsumer, ConsumeParams, QueueParams


def on_message(msg):
    ...

consumer = RMQConsumer()
consumer.start()
consumer.consume(ConsumeParams(on_message),
                 queue_params=QueueParams("queue_name"))

The flow of declaring, binding, and consuming is quite straightforward. The above example will declare a queue with the name "queue_name" and consume from it.

NOTE! Although the above may look synchronous, it is not. Start is asynchronous and any consume started while the consumer is not fully started will simply be delayed until it is. When a consume has been successfully started, the bound callback will receive a ConsumeOK object containing the resulting consumer tag.

Producer

RMQProducer extends the RMQConnection base class with two additional methods: publish and activate_confirm_mode. Publish is used, as it sounds, to publish messages towards queues and/or exchanges. The confirm mode activation method enabled confirm mode so that users can verify that messages have been delivered successfully.

from rabbitmq_client import RMQProducer, ExchangeParams


def on_confirm(confirmation):
    ...

producer = RMQProducer()
producer.start()
producer.activate_confirm_mode(on_confirm)  # Or don't, depends on your needs

producer.publish(b"body", 
                 exchange_params=ExchangeParams("exchange_name"),
                 routing_key="some.routing.key")

activate_confirm_mode isn't synchronous either, but you don't have to worry about that. Calling publish after activate_confirm_mode will lead to the publish not happening until confirm mode has been activated successfully. The callback passed to activate_confirm_mode will also receive a ConfirmModeOK once confirm mode is on. Any publish between calling activate_confirm_mode and the producer receiving a confirm_select_ok from RabbitMQ will be buffered and not issues until confirm mode is on. When confirm mode is on, publish also returns a key that clients can use to correlate successful delivered with calls to publish. Once a publish call with key X is confirmed, the callback passed to activate_confirm_mode will be called with X.

Abstract connection helper

The abstract RMQConnection class can be subclassed to get a head start in using the pika SelectConnection and Channel objects as it wraps them and provides an easy-to-use interface as well as event hooks on important happenings.

RMQConnection lifecycle hooks

Subclassing RMQConnection requires the implementer to override three methods: on_ready, on_close, and on_error.

on_ready is called when RMQConnection has established a connection and opened a channel.

on_close is called when either the connection, or the channel closes for any reason. This means the implementer may receive two calls for one failed connection, one for the channel and one for the connection itself. This makes it important that on_close is made idempotent.

on_error is called when a recent action failed, such as an exchange declaration failure. These hooks are meant to enable the implementer to react to the connection state and restore an operating state. The RMQConnection abstract base class is used by the rabbitmq_client project to implement its RMQConsumer and RMQProducer classes.

RMQConnection interface methods

In addition to the hooks that need to be implemented by implementing classes, RMQConnection provides three public methods that can be used to interact with the connection object: start, restart, and stop.

start initiates the connection, establishing a pika.SelectConnection and if that's successful, opening a pika.Channel for the opened connection. Once a channel has been opened, RMQConnection will issue a call to on_ready. Subsequent calls to start have no effect if the connection has already been started.

restart closes the open connection and ensures that it is started again once is has been fully closed. restart is only meant to be used on successfully established connections, it will have no effect on closed connections. restart is meant to be used as a means to change pika.ConnectionParameters on the fly.

stop permanently closes an open connection and will have no effect on a closed connection. A connection for which stop has been called cannot be re-used. on_close is called once the connection is completely stopped.

Aside from the connection-related methods, the RMQConnection also exposes interations with the pika.Channel, named similarily. See here for what is exposed: Pika docs.

Automatic reconnection

RMQConnection will re-establish lost connections, but not lost channels. Reconnections will not be done for any reason though, among the reasons for reconnecting are:

  • pika.exceptions.ConnectionClosedByBroker
  • pika.exceptions.StreamLostError

These two exceptions cover the cases where the broker has been shut down, either expectedly or unexpectedly, or when the connection is lost for some other reason.

Again, if the channel is lost, but the connection remains intact, RMQConnection will not recover the channel.

Reconnection attempts will be made with an increasing delay between attempts. The first attempt is instantaneous, the second is delayed by 1 second, the third by 2 seconds, etc. After the 9th attempt, the following reconnects will be made at 30 second intervals.

Logging

rabbitmq_client follows Python logging standards and is by default disabled. To enable logging, attach a handler to rabbitmq_client:

import logging

logging.getLogger("rabbitmq_client").addHandler(logging.StreamHandler())

By default, a logging.NullHandler() is attached to this logger.