Symfony bundle for convenient work with queues. Currently it supports RabbitMQ.
-
Install bundle
composer require lamoda/queue-bundle
-
Extend
Lamoda\QueueBundle\Entity\QueueEntityMappedSuperclass
use Doctrine\ORM\Mapping as ORM; use Lamoda\QueueBundle\Entity\QueueEntityMappedSuperclass; /** * @ORM\Entity(repositoryClass="Lamoda\QueueBundle\Entity\QueueRepository") */ class Queue extends QueueEntityMappedSuperclass { }
-
Configure bundle parameters
lamoda_queue: ## required entity_class: App\Entity\Queue max_attempts: 5 batch_size_per_requeue: 5 batch_size_per_republish: 5 ## optional strategy_delay_geometric_progression_start_interval_sec: 60 strategy_delay_geometric_progression_multiplier: 2
-
Register bundle
class AppKernel extends Kernel { // ... public function registerBundles() { $bundles = [ // ... new Lamoda\QueueBundle\LamodaQueueBundle(), // ... ]; return $bundles; } // ... }
or add to
config/bundles.php
return [ // ... Lamoda\QueueBundle\LamodaQueueBundle::class => ['all' => true], // ... ];
-
Migrate schema
doctrine:migrations:diff
to create migration forqueue
tabledoctrine:migrations:migrate
- apply the migration
-
Define new exchange constant
namespace App\Constant; class Exchanges { public const DEFAULT = 'default'; }
-
Add new node to
old_sound_rabbit_mq.producers
with previous defined constant name, example:old_sound_rabbit_mq: producers: default: connection: default exchange_options: name: !php/const App\Constant\Exchanges::DEFAULT type: "direct"
-
Define new queue constant
namespace App\Constant; class Queues { public const NOTIFICATION = 'notification'; }
-
Register consumer for queue in
old_sound_rabbit_mq.consumers
with previous defined constant name, example:old_sound_rabbit_mq: consumers: notification: connection: default exchange_options: name: !php/const App\Constant\Exchanges::DEFAULT type: "direct" queue_options: name: !php/const App\Constant\Queues::NOTIFICATION routing_keys: - !php/const App\Constant\Queues::NOTIFICATION callback: "lamoda_queue.consumer"
-
Create job class, extend
AbstractJob
by example:namespace App\Job; use App\Constant\Exchanges; use App\Constant\Queues; use Lamoda\QueueBundle\Job\AbstractJob; use JMS\Serializer\Annotation as JMS; class SendNotificationJob extends AbstractJob { /** * @var string * * @JMS\Type("int") */ private $message; public function __construct(string $message) { $this->message = $message; } public function getDefaultQueue(): string { return Queues::NOTIFICATION; } public function getDefaultExchange(): string { return Exchanges::DEFAULT; } }
-
Create job handler, implement HandlerInterface by example:
namespace App\Handler; use Lamoda\QueueBundle\Handler\HandlerInterface; use Lamoda\QueueBundle\QueueInterface; class SendNotificationHandler implements HandlerInterface { public function handle(QueueInterface $job): void { // implement service logic here } }
-
Tag handler at service container
services: App\Handler\SendNotificationHandler: public: true tags: - { name: queue.handler, handle: App\Job\SendNotificationJob }
-
Add queue name in "codeception.yml" at
modules.config.AMQP.queues
-
Execute
./bin/console queue:init
command
./bin/console queue:init
$job = new SendNotificationJob($id);
$container->get(Lamoda\QueueBundle\Factory\PublisherFactory::class)->publish($job);
./bin/console queue:consume notification
./bin/console queue:requeue
You can queue any primitive class, just implement QueueInterface
:
namespace App\Process;
use Lamoda\QueueBundle\Entity\QueueInterface;
class MyProcess implements QueueInterface
{
// implement interface functions
}
services:
App\Handler\MyProcessHandler:
public: true
tags:
- { name: queue.handler, handle: App\Process\MyProcess }
$process = new MyProcess();
$container->get('queue.publisher')->publish($process);
If you want to rerun queue, throw Lamoda\QueueBundle\Exception\RuntimeException
.
If you want mark queue as failed, throw any another kind of exception.
namespace App\Handler;
use Lamoda\QueueBundle\Handler\HandlerInterface;
use Lamoda\QueueBundle\QueueInterface;
class SendNotificationHandler implements HandlerInterface
{
public function handle(QueueInterface $job): void
{
// implement service logic here
// Rerun queue
if ($rerun === true) {
throw new Lamoda\QueueBundle\Exception\RuntimeException('Error message');
}
// Mark queue as failed
if ($failed === true) {
throw new \Exception();
}
}
}
By default delay time is calculated exponentially. You can affect it through configuration.
lamoda_queue:
## required
## ...
max_attempts: 5
## optional
strategy_delay_geometric_progression_start_interval_sec: 60
strategy_delay_geometric_progression_multiplier: 2
When consumer wants to execute reached maximum attempts queue.
Properties:
- Queue Entity
QueueAttemptsReachedEvent::getQueue()
make php-cs-check
make php-cs-fix
Unit
make test-unit