Rabbit Queue Bundle
Введение
Бандл предоставляет инструменты по работе с очередями RabbitMQ
посредством механизма producer
- consumer
.
Содержание
- Требования
- Установка
- Конфигурация
- Описание компонентов
- Доступные команды
- Использование
- Использование
RouterPublisher
- Примеры
- Лицензия
Требования
Для корректной работы бандла требуется подключить следующие плагины RabbitMQ:
Установка
Шаг 1: Загрузка бандла
В директории проекта, выполните следующую команду для загрузки наиболее подходящей стабильной версии этого бандла:
composer require marfatech/rabbit-queue-bundle
Эта команда подразумевает что Composer установлен и доступен глобально.
Шаг 2: Подключение бандла
Необходимо включить бандл добавив его в список зарегистрированных бандлов в app/AppKernel.php
файл вашего проекта:
<?php
// app/AppKernel.php
class AppKernel extends Kernel
{
// ...
public function registerBundles()
{
$bundles = [
// ...
new MarfaTech\Bundle\RabbitQueueBundle\MarfaTechRabbitQueueBundle(),
];
return $bundles;
}
// ...
}
Конфигурация
Чтобы начать использовать бандл, необходимо описать конфигурацию подключения к RabbitMQ
.
# app/packages/marfatech_rabbit_queue.yaml
marfatech_rabbit_queue:
connections:
default:
host: 'rabbitmq' # хост для подключения к rabbitMQ
port: 5672 # порт для подключения к rabbitMQ
username: 'rabbitmq_user' # логин для подключения к rabbitMQ
password: 'rabbitmq_password' # пароль для подключения к rabbitMQ
vhost: 'example_vhost' # виртуальный хост для подключения (необязательный параметр)
connection_timeout: 3 # таймаут соединения @deprecated используйте options.connection_timeout
read_write_timeout: 3 # таймаут на чтение/запись @deprecated используйте options.read_write_timeout
heartbeat: 0 # частота heartbeat @deprecated используйте options.heartbeat
options: # опции для попыток подключений ко всем хостам из списка по очереди (необязательный параметр)
connection_timeout: 3 # таймаут соединения
read_write_timeout: 3 # таймаут на чтение/запись
heartbeat: 0 # частота heartbeat
lazy_connection: false # Lazy соединение инициализируется в момент использования
reconnect_retries: 3 # Количество попыток реконнекта к RabbitMq при потере соединения int или null (по умолчанию 0)
consumer:
wait_timeout: 3 # таймаут ожидания новых сообщений для обработки пачки в секундах (по умолчанию 3)
idle_timeout: 0 # таймаут ожидания сообщений в пустой очереди в секундах (по умолчанию 0 - нет таймаута)
batch_timeout: 0 # таймаут сборки пачки сообщений в секундах (по умолчанию 0 - нет таймаута)
default_max_processed_tasks_count: 1000 # максимальное количество задач в обработке (по умолчанию 1000)
При указании options
значения ключей конфигурации connection_timeout
, read_write_timeout
, heartbeat
будут взяты из него.
В случае если options
не указан значения этих ключей конфигурации будет взято из первого значения ключа конфигурации connections
.
Попытка подключения к хостам, указанных в ключе конфигурации connection
, будет происходить по очереди и вернет первое удачное подключение. Multiple hosts connections
При указании lazy_connection
= true
соединение будет инициализировано не при инициализации всех классов, а в момент использования.
Параметр reconnect_retries
используется для автоматического реконнекта при обрыве соединения с RabbitMq.
Может принимать целочисленное значение (максимальное количество попыток реконнекта), либо null
для бесконечного переподключения.
Описание компонентов
Producer
Producer
- используется для отправки сообщений в очередь.
Для этих целей в бандле реализован RabbitMqProducer, с помощью которого можно отправлять сообщения в очередь с заданными параметрами.
<?php
$data = ['message' => 'example']; # Сообщение
$options = ['key' => 'unique_key', 'delay' => 1000]; # Опции, в зависимости от типа очереди
$routingKey = 'test.routing.key'; # Ключ маршрутизации сообщения, для очередей с типом `ROUTER`
$properties = ['type' => 'test']; # Дополнительные свойства сообщения AMQPMessage
/** @var \MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer $producer */
$producer->put('queue_name', $data, $options, $routingKey, $properties);
Queue Pool
Все сообщения по умолчанию передаются в очередь через пул сообщений, который позволяет производить отложенную отправку накопленных сообщений.
Это поведение можно изменить индивидуально для каждого сообщения передав опцию use-queue-pool
во время вызова функции MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducerInterface::put
.
<?php
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueOptionEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Producer\RabbitMqProducerInterface;
$data = ['message' => 'example'];
$options = [QueueOptionEnum::USE_QUEUE_POOL => false];
/** @var RabbitMqProducerInterface $producer */
$producer->put('queue_name', $data, $options);
Publisher
Публикация сообщений в очередь происходит с помощью специальных классов паблишеров.
Producer
определяет какой паблишер использовать для публикации по типу очереди, с которым связан паблишер.
Соответственно на каждый новый тип очереди требуется свой класс Publisher
с кастомной логикой обработки/валидации и публикации сообщений в канал.
Бандл поддерживает следующие типы очередей и обменников:
- FIFO
- Delay
- Deduplicate
- Deduplicate + Delay
- Router
Router используется для создания разветвленной топологии как описано тут и тут
При желании добавить собственный тип очереди, необходимо создать класс Publisher
наследующий AbstractPublisher или реализующий PublisherInterface.
Пример DelayPublisher:
<?php
declare(strict_types=1);
namespace MarfaTech\Bundle\RabbitQueueBundle\Publisher;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueHeaderOptionEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Definition\DefinitionInterface;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueOptionEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Exception\RabbitQueueException;
use function is_int;
use function sprintf;
class DelayPublisher extends AbstractPublisher
{
public const QUEUE_TYPE = QueueTypeEnum::FIFO | QueueTypeEnum::DELAY;
/**
* Custom prepare options logic
*/
protected function prepareOptions(DefinitionInterface $definition, array $options): array
{
$delay = $options[QueueOptionEnum::DELAY] ?? null;
if (!is_int($delay)) {
$message = sprintf(
'Element for queue "%s" must be with option %s. See %s',
$definition::getQueueName(),
QueueOptionEnum::DELAY,
QueueOptionEnum::class
);
throw new RabbitQueueException($message);
}
$amqpTableOption[QueueHeaderOptionEnum::X_DELAY] = $delay * 1000;
return $amqpTableOption;
}
/**
* Queue type supported by publisher
*/
public static function getQueueType(): string
{
return (string) self::QUEUE_TYPE;
}
}
Consumer
Consumer
- Используется для получения и обработки сообщений из очереди.
Для реализации логики обработки сообщений необходимо создать класс consumer
,
реализующий ConsumerInterface,
либо наследующий AbstractConsumer, который содержит предустановленные значения для некоторых методов.
<?php
declare(strict_types=1);
namespace Acme\AppBundle\Consumer;
use MarfaTech\Bundle\RabbitQueueBundle\Consumer\AbstractConsumer;
class ExampleConsumer extends AbstractConsumer
{
public const DEFAULT_BATCH_SIZE = 100; # Размер пачки
/**
* {@inheritDoc}
*/
public function process(array $messageList): void
{
foreach ($messageList as $item) {
$data = $this->decodeMessageBody($item); # Decode message by hydrator
// handle some task by specific logic
}
}
/**
* {@inheritDoc}
*/
public function getBindQueueName(): string
{
return 'example';
}
/**
* {@inheritDoc}
*/
public static function getName(): string
{
return 'example';
}
}
В методе process()
необходимо реализовать обработку полученных сообщений.
Сообщения поступают пачками, размер которых задается константой DEFAULT_BATCH_SIZE
(по умолчанию = 1).
Сумма DEFAULT_BATCH_SIZE
со всех потребителей одной очереди не должна превышать значения 65535.
Hydrator
Для удобства работы с сообщениями разных форматов бандл предоставляет инструменты гидрации (кодирование/декодирование сообщений в необходимый формат).
По умолчанию доступны следующие гидраторы:
- JsonHydrator - для работы с сообщениями в формате json (используется по умолчанию).
- PlainTextHydrator - для работы с простыми текстовыми сообщениями.
Также существует возможность создания собственного гидратора.
Для этого необходимо реализовать HydratorInterface и изменить параметр конфигурации hydrator_name
на тип нового гидратора.
Definition
RabbitMQ позволяет создавать сложные схемы очередей, состоящие из несколько взаимосвязанных exchange
и queue
.
Для удобства работы со схемами бандл предоставляет возможность сохранения схем очередей в специальные классы Definition
,
которые реализуют DefinitionInterface.
Пример FIFO:
<?php
declare(strict_types=1);
namespace MarfaTech\Bundle\RabbitQueueBundle\Definition;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ExampleFifoDefinition implements DefinitionInterface
{
public const QUEUE_NAME = QueueEnum::EXAMPLE_FIFO;
public const ENTRY_POINT = self::QUEUE_NAME;
/**
* {@inheritDoc}
*/
public function init(AMQPStreamConnection $connection): void
{
$channel = $connection->channel();
$channel->queue_declare(
self::ENTRY_POINT,
false,
true,
false,
false
);
}
/**
*
* {@inheritDoc}
*/
public function getEntryPointName(): string
{
return self::ENTRY_POINT;
}
/**
* {@inheritDoc}
*/
public function getQueueType(): int
{
return QueueTypeEnum::FIFO;
}
/**
* {@inheritDoc}
*/
public static function getQueueName(): string
{
return self::QUEUE_NAME;
}
}
Пример delay + deduplicate:
<?php
declare(strict_types=1);
namespace MarfaTech\Bundle\RabbitQueueBundle\Definition;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use MarfaTech\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Wire\AMQPTable;
class ExampleDeduplicateDelayDefinition implements DefinitionInterface
{
public const QUEUE_NAME = QueueEnum::EXAMPLE_DEDUPLICATE_DELAY;
public const ENTRY_POINT = self::QUEUE_NAME . '@exchange_deduplication';
private const SECOND_POINT = self::QUEUE_NAME . '@exchange_delay';
private const THIRD_POINT = self::QUEUE_NAME;
/**
* {@inheritDoc}
*/
public function init(AMQPStreamConnection $connection): void
{
$channel = $connection->channel();
$channel->exchange_declare(
self::ENTRY_POINT,
'x-message-deduplication',
false,
true,
false,
false,
false,
new AMQPTable(['x-cache-size' => 1_000_000_000])
);
$channel->exchange_declare(
self::SECOND_POINT,
'x-delayed-message',
false,
true,
false,
false,
false,
new AMQPTable(['x-delayed-type' => AMQPExchangeType::DIRECT])
);
$channel->queue_declare(
self::THIRD_POINT,
false,
true,
false,
false
);
$channel->exchange_bind(self::SECOND_POINT, self::ENTRY_POINT);
$channel->queue_bind(self::THIRD_POINT, self::SECOND_POINT);
}
/**
* {@inheritDoc}
*/
public function getEntryPointName(): string
{
return self::ENTRY_POINT;
}
/**
* {@inheritDoc}
*/
public function getQueueType(): int
{
return QueueTypeEnum::FIFO | QueueTypeEnum::DEDUPLICATE | QueueTypeEnum::DELAY;
}
/**
* {@inheritDoc}
*/
public static function getQueueName(): string
{
return self::QUEUE_NAME;
}
}
В методе init()
объявляется структура очереди состоящая из необходимых exchanges
, queue
и bindings
с помощью стандартных методов php-amqplib.
Метод getEntryPointName()
- отвечает за точку входа сообщений. Точкой входа может быть название exchange
или queue
в зависимости от структуры схемы.
Метод getQueueName()
- название очереди, куда в конечном итоге попадут сообщения.
Жизненный цикл сообщения:
Сообщение -> Producer -> EntryPoint -> Структура очереди exchanges, bindings -> Queue -> Consumer
Таким образом producer
отправляет сообщения на точку входа, а consumer
забирает сообщения из очереди.
В простейшем случае при использовании обычной очереди FIFO, точкой входа будет являться название очереди.
Доступные команды
rabbit:consumer:run
- запускает выбранный консьюмер.
php bin/console rabbit:consumer:run <name> # <name> - название консьюмера.
rabbit:definition:update
- загружает все схемы очередейRabbitMQ
в соответствии с существующими классамиDefinition
.
Примечание: Данная команда не обновляет существующие схемы.
php bin/console rabbit:definition:update
rabbit:consumer:list
- выводит список консьюмеров, зарегистрированных в проекте.
php bin/console rabbit:consumer:list
Пример вывода команды:
Total consumers count: 2
+--------------------+------------+
| Queue Name | Batch Size |
+--------------------+------------+
| example_first | 1 |
| example_second | 100 |
+--------------------+------------+
Использование
Шаг 1: Создание схемы очереди (Definition)
Для инициализации схемы, требуется создать класс Definition,
который реализует DefinitionInterface.
В методе init
нужно объявить структуру очереди состоящию из необходимых exchanges
, queue
и bindings
с помощью стандартных методов работы с каналом php-amqplib.
Шаг 2: Создание consumer'а
Далее необходимо создать класс-consumer
, наследующий AbstractConsumer.
А в методе process
реализовать обработку полученных сообщений.
Если в проекте не работает механизм autowire
, то вам понадобится зарегистрировать consumer
с тегом marfatech_rabbit_queue.consumer
:
services:
app.acme.consumer:
class: Acme\AppBundle\Consumer\ExampleConsumer
tags:
- { name: marfatech_rabbit_queue.consumer }
Шаг 3: Загрузка схем очередей RabbitMQ
Чтобы загрузить схемы definition
в RabbitMQ необходимо выполнить команду rabbit:definition:update
.
Данная команда обновит схему в соответствии с существующими классами Definition
, реализующими DefinitionInterface.
php bin/console rabbit:definition:update
Шаг 4: Запуск consumer'а
Чтобы запустить consumer
необходимо выполнить команду rabbit:consumer:run
rabbit.
Для запуска нужно передать имя конкретного consumer
.
Запуск ранее описанного consumer
'а будет выглядеть так:
php bin/console rabbit:consumer:run example
Для просмотра списка всех зарегистрированных consumer
'ов достаточно выполнить команду rabbit:consumer:list
.
RouterPublisher
Использование RouterPublisher
следует использовать в случаях, когда нужно множество очередей, а каждое сообщение должно попадать
сразу в некоторое их подмножество, определяемое по routingKey
сообщения. Для таких целей нужно создать Definition
,
в котором будет определена только exchange
типа direct
, topic
или fanout
. Эта Definition
будет использоваться
в качестве точки входя для сообщений. После этого нужно создать по одной Definition
на каждую очередь, и все их
биндить на первую Definition
. Можно создать сложную маршрутизацию, если вместо очередей создавать и биндить
Definition
типа первой.
Definition
с exchange
:
Пример <?php
declare(strict_types=1);
namespace Wakeapp\Bundle\RabbitQueueBundle\Definition;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ExampleTopicExchangeDefinition implements DefinitionInterface
{
public const QUEUE_NAME = QueueEnum::EXAMPLE_TOPIC_EXCHENGE;
public const ENTRY_POINT = self::QUEUE_NAME;
/**
* {@inheritDoc}
*/
public function init(AMQPStreamConnection $connection): void
{
$channel = $connection->channel();
$channel->exchange_declare(
self::QUEUE_NAME,
'topic',
false,
true,
);
}
/**
* {@inheritDoc}
*/
public function getEntryPointName(): string
{
return self::ENTRY_POINT;
}
/**
* {@inheritDoc}
*/
public function getQueueType(): int
{
return QueueTypeEnum::ROUTER;
}
/**
* {@inheritDoc}
*/
public static function getQueueName(): string
{
return self::QUEUE_NAME;
}
}
Definition
для очереди
Пример <?php
declare(strict_types=1);
namespace Wakeapp\Bundle\RabbitQueueBundle\Definition;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueEnum;
use Wakeapp\Bundle\RabbitQueueBundle\Enum\QueueTypeEnum;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class ExampleRoutedQueryDefinition implements DefinitionInterface
{
public const QUEUE_NAME = QueueEnum::EXAMPLE_ROUTED_FIFO;
public const ENTRY_POINT = QueueEnum::EXAMPLE_TOPIC_EXCHENGE; // это QUEUE_NAME из примера выше
public const ROUTING = [
'*.orange.*',
'big.#',
'*.black.car'
];
/**
* {@inheritDoc}
*/
public function init(AMQPStreamConnection $connection): void
{
$channel = $connection->channel();
$channel->queue_declare(
self::QUEUE_NAME,
false,
true,
false,
false
);
foreach (self::ROUTING as $route) {
$channel->queue_bind(self::QUEUE_NAME, self::ENTRY_POINT, $route); // биндим на exchange из первой Definition
}
}
/**
* {@inheritDoc}
*/
public function getEntryPointName(): string
{
return self::ENTRY_POINT;
}
/**
* {@inheritDoc}
*/
public function getQueueType(): int
{
return QueueTypeEnum::FIFO;
}
/**
* {@inheritDoc}
*/
public static function getQueueName(): string
{
return self::QUEUE_NAME;
}
}
После определения биржи и очередей отправка сообщений будет выглядеть как и раньше, но сообщения будут попадать в очереди только при подходящем routingKey (четвертый параметр в методе put()).
<?php
$data = ['message' => 'example']; # Сообщение
$options = [];
/** @var \Wakeapp\Bundle\RabbitQueueBundle\Producer\RabbitMqProducer $producer */
$producer->put('queue_name', $data, $options, 'small.orange.bicycle'); // попадет в очередь по роуту '*.orange.*'
$producer->put('queue_name', $data, $options, 'big.aaa.bbb.and.more.words'); // попадет в очередь по роуту 'big.#'
$producer->put('queue_name', $data, $options, 'small.black.bicycle'); // НЕ попадет в очередь из примера
Важно!!! Длина routeKey не должна превышать 255 символов
Примеры
Использование RewindPartialException
Чтобы перемотать сообщения в конец очереди, нужно выбросить исключение RewindPartialException. Первый аргумент принимает массив идентификаторов (тегов) сообщений. Второй аргумент - массив, где ключ - тег сообщения, значение - контекст сообщения. С помощью контекста можно управлять логикой обработки сообщения. Получить контекст:
$headers = $message->get('application_headers');
$context = $headers->getNativeData()[QueueHeaderOptionEnum::X_CONTEXT];