/php-underground-radio

A PHP library that allows messages to be sent via various services that implement both a message queue architecture and a publish/subscribe service

Primary LanguagePHPMIT LicenseMIT

PHP UndergroundRadio

Disclaimer: This project is in a very early stage of development and should not be used in production. The present code is not much more than a first draft. Nevertheless, it may already be of use to someone. Pull requests always welcome...

Introduction

The UndergroundRadio library allows messages to be sent via various services that implement both a message queue architecture and a publish/subscribe service.

Although the architecture provides an abstract interface for the integration of any suitable services, at this time only an implementation based on Redis exists. Here the focus is on the use of modern Redis features like Streams and Pub/Sub.

This library is inspired by Enqueue library and the queue-interop protocol, but takes some different approaches in detail.

Installation

This package requires PHP 8.1 or greater!

Installing with composer:

$ composer require guidepilot/php-underground-radio

Usage examples for pub/sub pattern

This pattern is also known as broadcast or fan-out architecture.

Simple message producer via channel

use GuidePilot\UndergroundRadio\Broadcast\Channel;
use GuidePilot\UndergroundRadio\JsonSerializer;
use GuidePilot\UndergroundRadio\Message;
use GuidePilot\UndergroundRadio\Producer;
use GuidePilot\UndergroundRadio\RedisConfig;
use GuidePilot\UndergroundRadio\RedisRadioContext;

$redisConfig = new RedisConfig('localhost');
$serializer = new JsonSerializer();
$context = new RedisRadioContext($redisConfig, $serializer);

$producer = new Producer($context);
$channel = new Channel('fooChannel');

$message = new Message(uniqid());
$message->addHeader('cli-test', "1");
$message->setBody('Hello world!');

$producer->send($message, $channel);

Simple message subscriber

use GuidePilot\UndergroundRadio\Broadcast\Channel;
use GuidePilot\UndergroundRadio\Broadcast\Interfaces\SubscriptionMessageProcessor;
use GuidePilot\UndergroundRadio\Broadcast\Subscriber;
use GuidePilot\UndergroundRadio\JsonSerializer;
use GuidePilot\UndergroundRadio\RedisConfig;
use GuidePilot\UndergroundRadio\RedisRadioContext;


$redisConfig = new RedisConfig('localhost');
$serializer = new JsonSerializer();
$context = new RedisRadioContext($redisConfig, $serializer);

$subscriber = new Subscriber($context);
$channel = new Channel('fooChannel');

$subscriber->subscribe($channel, new class implements SubscriptionMessageProcessor {

    public function processMessage(\GuidePilot\UndergroundRadio\Interfaces\Message $message, \GuidePilot\UndergroundRadio\Broadcast\Interfaces\Channel $channel) {
        echo "--- New message from {$channel->getDestinationIdentifier()} ---".PHP_EOL;
        print_r($message);
        echo PHP_EOL;
    }
});

Usage examples message queue pattern

Simple message producer via queue

use GuidePilot\UndergroundRadio\JsonSerializer;
use GuidePilot\UndergroundRadio\Message;
use GuidePilot\UndergroundRadio\Producer;
use GuidePilot\UndergroundRadio\Queue\CappedQueue;
use GuidePilot\UndergroundRadio\RedisConfig;
use GuidePilot\UndergroundRadio\RedisRadioContext;

$redisConfig = new RedisConfig('localhost');
$serializer = new JsonSerializer();
$context = new RedisRadioContext($redisConfig, $serializer);

$producer = new Producer($context);
$queue = new CappedQueue('fooQueue', 42);

$message = new Message(uniqid());
$message->addHeader('cli-test', "1");
$message->setBody('Hello queue world! (capped)');

$producer->send($message, $queue);

Simple queue consumer

use GuidePilot\UndergroundRadio\Interfaces\Message;
use GuidePilot\UndergroundRadio\PhpSerializer;
use GuidePilot\UndergroundRadio\Queue\Interfaces\ProcessorResult;
use GuidePilot\UndergroundRadio\Queue\Interfaces\QueueMessageProcessor;
use GuidePilot\UndergroundRadio\Queue\Queue;
use GuidePilot\UndergroundRadio\Queue\QueueConsumer;
use GuidePilot\UndergroundRadio\Queue\QueueConsumerGroup;
use GuidePilot\UndergroundRadio\RedisConfig;
use GuidePilot\UndergroundRadio\RedisRadioContext;

$redisConfig = new RedisConfig('localhost');
$serializer = new PhpSerializer();
$context = new RedisRadioContext($redisConfig, $serializer);

$group = new QueueConsumerGroup('worker');
$consumer = new QueueConsumer($context, 'worker-0', $group);
$queue = new Queue('fooQueue');

$consumer->consume($queue, new class implements QueueMessageProcessor {

    public function processMessage(Message $message, \GuidePilot\UndergroundRadio\Queue\Interfaces\Queue $queue): ProcessorResult {
        echo "--- New message from {$queue->getDestinationIdentifier()} ---".PHP_EOL;
        print_r($message);
        echo PHP_EOL;

        return ProcessorResult::Acknowledge;
    }

    public function handleMaxRequeueCountReached(Message $message, \GuidePilot\UndergroundRadio\Queue\Interfaces\Queue $queue) {
        echo "!!! Message {$message->getMessageId()} reached max requeue count !!!".PHP_EOL;
    }

});

License

It is released under the MIT License.