This component will help you to build a messages brocker system over a redis backend. It will take advantage of the redis cluster capabilities with the possibility to shard messages into several redis lists while producing messages. Consumer, in no auto-ack mode, implements a working list to be sure not to loose any messages when processing fail.
By design, producing is super fast (one Redis command) and consuming can be slow if you want to ack messages manually.
You should use it with Redis >= 2.8.
<?php
use M6Web\Component\RedisMessageBroker;
use Predis\Client as PredisClient;
$redisClient = new PredisClient(); // refer to PredisDocumentation
$queue = new RedisMessageBroker\Queue\Definition('raoul');
$message = new RedisMessageBroker\MessageEnvelope(uniqid(), 'un message');
$producer = new RedisMessageBroker\MessageHandler\Producer($queue, $redisClient);
$producer->publishMessage($message);
Consumer should be wrapped in a worker. A unique Id should be pass to the consumer constructor. If you work with a worker, uniqId has to be constant per worker.
<?php
use M6Web\Component\RedisMessageBroker;
use Predis\Client as PredisClient;
$redisClient = new PredisClient(); // refer to PredisDocumentation
$queue = new RedisMessageBroker\Queue\Definition('raoul');
$consumer = new RedisMessageBroker\MessageHandler\Consumer($queue, $redisClient, uniqid());
$message = $consumer->getMessageEnvelope();
Inspector methods allow you to count the messages in ready or processing in a queue.
<?php
use M6Web\Component\RedisMessageBroker;
use Predis\Client as PredisClient;
$redisClient = new PredisClient(); // refer to PredisDocumentation
$queue = new RedisMessageBroker\Queue\Definition('raoul');
$inspector = new RedisMessageBroker\Queue\Inspector($queue, $redisClient);
$countInProgress = $inspector->countInProgressMessages();
$countReady = $inspector->countReadyMessages();
Cleanup methods let you perform a cleanup in the message queue. Cleanup is very slow as all the message in the queue will be scanned.
<?php
use M6Web\Component\RedisMessageBroker;
use Predis\Client as PredisClient;
$redisClient = new PredisClient(); // refer to PredisDocumentation
$queue = new RedisMessageBroker\Queue\Definition('raoul');
$cleanup = new RedisMessageBroker\Queue\Cleanup($queue, $redisClient);
$cleanup->cleanOldMessages(
3600, // erase messages older than 3600 seconds
true // clean message in the ready queue too. Mandatory use if you are in no-autoack mode
);
To avoid hotpsots you can shard a queue on several lists :
$queue = new RedisMessageBroker\Queue\Definition('raoul', 10); // shard on 10 lists
In this mode, messages will be written and read among the 10 lists. FIFO is no more guaranteed.
with setNoAutoAck()
<?php
use M6Web\Component\RedisMessageBroker;
use Predis\Client as PredisClient;
$redisClient = new PredisClient(); // refer to PredisDocumentation
$queue = new RedisMessageBroker\Queue\Definition('raoul');
$consumer = new RedisMessageBroker\MessageHandler\Consumer($queue, $redisClient, uniqid());
$consumer->setNoAutoAck();
$message = $consumer->getMessageEnvelope();
if ($message) {
// do something with the message
$consumer->ack($message); // erase the message from the working list
}
Each consumer got an unique Id defined during the construction of the object. This Id allow the consumer to define a unique working list where a message is stored between the getMessage
and the ack
.
Is it possible to use LostMessageConsumer class to look on consumer working lists and move message more than x second old (messageTtl
parameter) from those lists to a queue list.
The maxRetry
parameter will put a message in a dead letter list when the retry number is reached.
<?php
use M6Web\Component\RedisMessageBroker;
use Predis\Client as PredisClient;
$redisClient = new PredisClient(); // refer to PredisDocumentation
$queue = new RedisMessageBroker\Queue\Definition('raoul');
$consumer = new RedisMessageBroker\MessageHandler\LostMessagesConsumer($queue, $redisClient, 360, 3);
$consumer->requeueOldMessages();
messageTtl
parameter need to be superior of your max time to process a message. (Otherwise it will consider a message old when its processing)