
PHP implementation of Redis Simple Message Queue

Redis Simple Message Queue

A lightweight message queue for PHP that requires no dedicated queue server. Just a Redis server. See smrchy/rsmq for more information.

This is a fork of eislambey/php-rsmq with the following changes:

  • Uses predis instead of the Redis extension
  • Has some OO wrappers for QueueAttributes and Message
  • Provides a simple QueueWorker

composer require andrewbreksa/rsmq



Creates a new instance of RSMQ.


  • $predis (\Predis\ClientInterface): *required The Predis instance
  • $ns (string): optional (Default: "rsmq") The namespace prefix used for all keys created by RSMQ
  • $realtime (Boolean): optional (Default: false) Enable realtime PUBLISH of new messages


use Predis\Client;
use AndrewBreksa\RSMQ\RSMQClient;

$predis = new Client(
        'host' => '',
        'port' => 6379
$this->rsmq = new RSMQClient($predis);



Create a new queue.


  • $name (string): The Queue name. Maximum 160 characters; alphanumeric characters, hyphens (-), and underscores (_) are allowed.
  • $vt (int): optional (Default: 30) The length of time, in seconds, that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Allowed values: 0-9999999 (around 115 days)
  • $delay (int): optional (Default: 0) The time in seconds that the delivery of all new messages in the queue will be delayed. Allowed values: 0-9999999 (around 115 days)
  • $maxsize (int): optional (Default: 65536) The maximum message size in bytes. Allowed values: 1024-65536 and -1 (for unlimited size)


  • true (Bool)


  • \AndrewBreksa\RSMQ\Exceptions\QueueAlreadyExistsException


 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq



List all queues

Returns an array:

  • ["qname1", "qname2"]


 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq

$queues = $rsmq->listQueues();


Deletes a queue and all messages.


  • $name (string): The Queue name.


  • true (Bool)


  • \AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException


 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq



Get queue attributes, counter and stats


  • $queue (string): The Queue name.

Returns a \AndrewBreksa\RSMQ\QueueAttributes object with the following properties:

  • vt (int): The visibility timeout for the queue in seconds
  • delay (int): The delay for new messages in seconds
  • maxSize (int): The maximum size of a message in bytes
  • totalReceived (int): Total number of messages received from the queue
  • totalSent (int): Total number of messages sent to the queue
  • created (float): Timestamp (epoch in seconds) when the queue was created
  • modified (float): Timestamp (epoch in seconds) when the queue was last modified with setQueueAttributes
  • messageCount (int): Current number of messages in the queue
  • hiddenMessageCount (int): Current number of hidden / not visible messages. A message can be hidden while "in flight" due to a vt parameter or when sent with a delay


 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq

$attributes =  $rsmq->getQueueAttributes('myqueue');
echo "visibility timeout: ", $attributes->getVt(), "\n";
echo "delay for new messages: ", $attributes->getDelay(), "\n";
echo "max size in bytes: ", $attributes->getMaxSize(), "\n";
echo "total received messages: ", $attributes->getTotalReceived(), "\n";
echo "total sent messages: ", $attributes->getTotalSent(), "\n";
echo "created: ", $attributes->getCreated(), "\n";
echo "last modified: ", $attributes->getModified(), "\n";
echo "current n of messages: ", $attributes->getMessageCount(), "\n";
echo "hidden messages: ", $attributes->getHiddenMessageCount(), "\n";


Sets queue parameters.


  • $queue (string): The Queue name.
  • $vt (int): optional * The length of time, in seconds, that a message received from a queue will be invisible to other receiving components when they ask to receive messages. Allowed values: 0-9999999 (around 115 days)
  • $delay (int): optional The time in seconds that the delivery of all new messages in the queue will be delayed. Allowed values: 0-9999999 (around 115 days)
  • $maxsize (int): optional The maximum message size in bytes. Allowed values: 1024-65536 and -1 (for unlimited size)

Note: At least one attribute (vt, delay, maxsize) must be supplied. Only attributes that are supplied will be modified.

Returns a \AndrewBreksa\RSMQ\QueueAttributes object with the following properties:

  • vt (int): The visibility timeout for the queue in seconds
  • delay (int): The delay for new messages in seconds
  • maxSize (int): The maximum size of a message in bytes
  • totalReceived (int): Total number of messages received from the queue
  • totalSent (int): Total number of messages sent to the queue
  • created (float): Timestamp (epoch in seconds) when the queue was created
  • modified (float): Timestamp (epoch in seconds) when the queue was last modified with setQueueAttributes
  • messageCount (int): Current number of messages in the queue
  • hiddenMessageCount (int): Current number of hidden / not visible messages. A message can be hidden while "in flight" due to a vt parameter or when sent with a delay


  • \AndrewBreksa\RSMQ\QueueAttributes
  • \AndrewBreksa\RSMQ\QueueParametersValidationException
  • \AndrewBreksa\RSMQ\QueueNotFoundException


 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq

$queue = 'myqueue';
$vt = 50;
$delay = 10;
$maxsize = 2048;
$rsmq->setQueueAttributes($queue, $vt, $delay, $maxsize);



Sends a new message.


  • $queue (string)
  • $message (string)
  • $delay (int): optional (Default: queue settings) The time in seconds that the delivery of the message will be delayed. Allowed values: 0-9999999 (around 115 days)


  • $id (string): The internal message id.


  • \AndrewBreksa\RSMQ\Exceptions\MessageToLongException
  • \AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
  • \AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException


 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq

$id = $rsmq->sendMessage('myqueue', 'a message');
echo "Message Sent. ID: ", $id;


Receive the next message from the queue.


  • $queue (string): The Queue name.
  • $vt (int): optional (Default: queue settings) The length of time, in seconds, that the received message will be invisible to others. Allowed values: 0-9999999 (around 115 days)

Returns a \AndrewBreksa\RSMQ\Message object with the following properties:

  • message (string): The message's contents.
  • id (string): The internal message id.
  • sent (int): Timestamp of when this message was sent / created.
  • firstReceived (int): Timestamp of when this message was first received.
  • receiveCount (int): Number of times this message was received.

Note: Will return an empty array if no message is there


  • \AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
  • \AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException


 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq

$message = $rsmq->receiveMessage('myqueue');
echo "Message ID: ", $message->getId();
echo "Message: ", $message->getMessage();



  • $queue (string): The Queue name.
  • $id (string): message id to delete.


  • true if successful, false if the message was not found (bool).


  • \AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException


 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq

$id = $rsmq->sendMessage('queue', 'a message');
$rsmq->deleteMessage('queue', $id);


Receive the next message from the queue and delete it.

Important: This method deletes the message it receives right away. There is no way to receive the message again if something goes wrong while working on the message.


  • $queue (string): The Queue name.

Returns a \AndrewBreksa\RSMQ\Message object with the following properties:

  • message (string): The message's contents.
  • id (string): The internal message id.
  • sent (int): Timestamp of when this message was sent / created.
  • firstReceived (int): Timestamp of when this message was first received.
  • receiveCount (int): Number of times this message was received.

Note: Will return an empty object if no message is there


  • \AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException
  • \AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException


 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq

$message = $rsmq->popMessage('myqueue');
echo "Message ID: ", $message->getId();
echo "Message: ", $message->getMessage();


Change the visibility timer of a single message. The time when the message will be visible again is calculated from the current time (now) + vt.


  • qname (string): The Queue name.
  • id (string): The message id.
  • vt (int): The length of time, in seconds, that this message will not be visible. Allowed values: 0-9999999 (around 115 days)


  • true if successful, false if the message was not found (bool).


  • \AndrewBreksa\RSMQ\Exceptions\QueueParametersValidationException
  • \AndrewBreksa\RSMQ\Exceptions\QueueNotFoundException


 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq

$queue = 'myqueue';
$id = $rsmq->sendMessage($queue, 'a message');
if($rsmq->changeMessageVisibility($queue, $id, 60)) {
	echo "Message hidden for 60 secs";


When creating an instance of AndrewBreksa\RSMQ\RSMQClient, you can enable the realtime PUBLISH for new messages by passing true for the $realtime argument of \AndrewBreksa\RSMQ\RSMQClient::__construct. On every new message that is sent via sendMessage, a Redis PUBLISH will be issued to {rsmq.ns}:rt:{qname}.

Example for RSMQ with default settings:

  • The queue testQueue already contains 5 messages.
  • A new message is being sent to the queue testQueue.
  • The following Redis command will be issued: PUBLISH rsmq:rt:testQueue 6

The realtime option enables sending a PUBLISH when a new message is sent to RSMQ, however no further functionality is built on this feature. Your app could use the Redis SUBSCRIBE command to be notified of new messages and then attempt to poll from the queue, however due to how the Redis pub/sub system works, all listeners will be notified of the new message, this method doesn't lend itself to driving message handling in environments with more than one subscribed process.


The QueueWorker class provides an easy way to consume RSMQ messages, to use it:

 * @var AndrewBreksa\RSMQ\RSMQClientInterface $rsmq

use AndrewBreksa\RSMQ\ExecutorInterface;
use AndrewBreksa\RSMQ\Message;
use AndrewBreksa\RSMQ\QueueWorker;
use AndrewBreksa\RSMQ\WorkerSleepProvider;

$executor = new class() implements ExecutorInterface{
    public function __invoke(Message $message) : bool {
        //@todo: do some work, true will ack/delete the message, false will allow the queue's config to "re-publish"
        return true;

$sleepProvider = new class() implements WorkerSleepProvider{
    public function getSleep() : ?int {
         * This allows you to return null to stop the worker, which can be used with something like redis to mark.
         * Note that this method is called _before_ we poll for a message, and therefore if it returns null we'll eject
         * before we process a message.
        return 1;

$worker = new QueueWorker($rsmq, $executor, $sleepProvider, 'test_queue');
$worker->work(); // here we can optionally pass true to only process one message