/queue-tactician

Integrate the wonderful Tactician library with pmg/queue.

Primary LanguagePHPApache License 2.0Apache-2.0

pmg/queue-tactician

This is a middleware for Tactician to integrate it with pmg/queue.

Installation and Usage

Install with composer.

composer require pmg/queue-tactician

To use it, add the middleware to your middleware chain sometime before the default command handler middleware.

use League\Tactician\CommandBus;
use League\Tactician\Handler\CommandHandlerMiddleware;
use PMG\Queue\Producer;
use PMG\Queue\Tactician\QueueingMiddleware;

/** @var Producer */
$producer = createAQueueProducerSomehow();

$bus = new CommandBus([
    new QueueingMiddleware($producer),
    new CommandHandlerMiddleware(/*...*/),
]);

Enqueueing Commands

Any command that implements PMG\Queue\Message will be put into the queue via the producer and no further middlewares will be called.

use PMG\Queue\Message;

final class DoLongRunningStuff implements Message
{
    /**
     * {@inheritdoc}
     */
    public function getName()
    {
        return 'LongRunningStuff';
    }
}

// goes right into the queue
$bus->handle(new DoLongRunningStuff());

Dequeueing (Consuming) Commands

To use tactician to process the messages via the consumer, use PMG\Queue\Handler\TacticianHandler.

use PMG\Queue\DefaultConsumer;
use PMG\Queue\Handler\TacticianHandler;

/** @var League\Tactician\CommandBus $bus */
$handler = new TacticianHandler($bus);

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

$consumer->run();

The above assumes that the CommandBus instance still has the QueueingMiddleware installed. If not, you'll need to use your own handler that invokes the command bus, perhaps via CallableHandler.

use League\Tactician\CommandBus;
use League\Tactician\Handler\CommandHandlerMiddleware;
use PMG\Queue\DefaultConsumer;
use PMG\Queue\Message;
use PMG\Queue\Handler\CallableHandler;

// no QueueingMiddleware!
$differentBus = new CommandBus([
    new CommandHandlerMiddleware(/*...*/),
]);

$handler = new CallableHandler([$bus, 'handle']);

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

$consumer->run();

Beware of Wrapping This Handler with PcntlForkingHandler

The shared instance of the command bus means that it's very likely that things like open database connections will cause issues if/when a child press is forked to handle messages.

Instead a better bet is to create a new command bus for each message. CreatingTacticianHandler can do that for you.

use League\Tactician\CommandBus;
use League\Tactician\Handler\CommandHandlerMiddleware;
use PMG\Queue\Message;
use PMG\Queue\Handler\CallableHandler;
use PMG\Queue\Tactician\QueuedCommand;
use PMG\Queue\Tactician\QueueingMiddleware;
use PMG\Queue\Handler\CreatingTacticianHandler;

$handler = new CreatingTacticianHandler(function () {
    // this is invoked for every message
    return new CommandBus([
        new QueueingMiddleware(createAProduerSomehow()),
        new CommandHandlerMiddlware(/* ... */)
    ]);
});

/** @var PMG\Queue\Driver $driver */
$consumer = new DefaultConsumer($driver, $handler);

$consumer->run();