Набор классов для более удобной работы с RabbitMQ

Зачем может быть нужно

После регистрации пользователя на сайте, его данные необходимо отправить в некую "внешнюю систему" по API.

"Внешняя система" не гарантирует скорость ответа, какое-то время может быть не доступна. Отправляя данные синхронно, нет гарантии их доставки, увеличивается время отклика для пользователя.

Пример использования

Код добавит сообщение в очередь.

// После регистрации пользователя создаем Producer, отправляем данные в очередь
$connectionData = [
    'host' => 'localhost',
    'port' => '5673',
    'user' => 'guest',
    'password' => 'guest',
    'vhost' => '/',
];

(new RabbitMQExample\SimpleProducer($connectionData, ['userLogin' => 'login', 'userEmail' => 'email']))->produce();

Далее, должен быть запущен consumer(например с помощью supervisor)

$connectionData = [
    'host' => 'localhost',
    'port' => '5673',
    'user' => 'guest',
    'password' => 'guest',
    'vhost' => '/',
];
(new RabbitMQExample\SimpleConsumer($connectionData))->consume();

Логика обработки сообщений следующая:

  • Каждое сообщение представляет из себя json с данными. В него добавляется служебное поле start - время появления сообщения в очереди
  • При получении сообщения consumer'ом будет вызван соответсвующий перегруженный метод processMessage()
  • Если метод возврщает false - обработка сообщения была неуспешной, сообщение отправляется в очередь _retry
  • Через 15 минут сообщение автоматически из очереди _retry будет перемещено в основную очередь.
  • Если прошло более 4ех часов с момента добавления сообщения, оно удаляется и публикуется в очередь dead