A Laravel package that implements the AMQP protocol using the Rabbitmq Server easily and efficiently. Built on top of the php-amqplib, with the aim of providing a simple abstraction layer to work with.
Bowler allows you to:
- Customize message publishing.
- Customize message consumption.
- Customize message dead lettering.
- Handle application errors and deal with the corresponding messages accordingly.
- Provide an expressive consumer queue setup.
- Register a queue and generate it's message handler from the command line.
- Make use of a default Pub/Sub messaging.
In addition to the above Bowler offers limited admin functionalities.
These features will facilitate drastically the way you use Rabbitmq and broaden its functionality. This package do not intend to take over the user's responsability of designing the messaging schema.
Tools like the Rabbitmq Management plugin, will certainly help you monitor the server's activity and visualize the setup.
Table of Contents
Setup
Development
Usage
Producer
Consumer
Manual
Console
Publisher/Subscriber
Dispatcher
Dead Lettering
Error Handling
Error Reporting
Health Checks
Lifecycle Hooks
Testing
Important Notes
Todo
Starting version v0.4.2 this library requires Laravel 5.4 or later versions.
Starting version v0.5.0 this library requires Laravel 5.7 or later versions.
Starting version v0.9.0 this library requires Laravel 6.0 or later versions.
Install the package via Composer:
composer require vinelab/bowler
Laravel 5.4 users will also need to add the service provider to the providers
array in config/app.php
:
Vinelab\Bowler\BowlerServiceProvider::class,
After installation, you can publish the package configuration using the vendor:publish
command. This command will publish the bowler.php
configuration file to your config directory:
php artisan vendor:publish --provider="Vinelab\Bowler\BowlerServiceProvider"
You may configure RabbitMQ credentials in your .env
file:
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USERNAME=guest
RABBITMQ_PASSWORD=guest
Create the Docker container
docker-compose up -d
Bash into the container
docker-compose exec app bash
Install dependencies
composer install
Run tests
./vendor/bin/phpunit
In order to be able to send a message, a producer instance needs to be created and an exchange needs to be set up.
// Initialize a Bowler Connection
$connection = new Vinelab\Bowler\Connection();
// Initialize a Producer object with a connection
$bowlerProducer = new Vinelab\Bowler\Producer($connection);
// Setup the producer's exchange name and other optional parameters: exchange type, passive, durable, auto delete and delivery mode
$bowlerProducer->setup('reporting_exchange', 'direct', false, true, false, 2);
// Send a message with an optional routingKey
$bowlerProducer->send($data, 'warning');
or Inject the producer and let the IOC resolve the connection:
use Vinelab\Bowler\Producer;
class DoSomethingJob extends Job
{
protected $data;
public function __construct($data)
{
$this->data = $data;
}
public function handle(Producer $producer)
{
$producer->setup('reporting_exchange');
$producer->send(json_encode($this->data));
}
}
You need to make sure the exchange setup here matches the consumer's, otherwise a
Vinelab\Bowler\Exceptions\DeclarationMismatchException
is thrown.
If you mistakenly set an undefined value, setting up the exchange, e.g.
$exchangeType='noneExistingType'
aVinelab\Bowler\Exceptions\InvalidSetupException
is thrown.
Add 'Registrator' => Vinelab\Bowler\Facades\Registrator::class,
to the aliases array in config/app
.
In order to consume a message an exchange and a queue needs to be set up and a message handler needs to be created.
Configuring the consumer can be done both manually or from the command line:
-
Register your queues and handlers inside the
queues.php
file (think about the queues file as the routes file from Laravel), note that thequeues.php
file should be underApp\Messaging
directory:Registrator::queue('books', 'App\Messaging\Handlers\BookHandler', []); Registrator::queue('reporting', 'App\Messaging\Handlers\ErrorReportingHandler', [ 'exchangeName' => 'main_exchange', 'exchangeType'=> 'direct', 'bindingKeys' => [ 'warning', 'notification' ], 'pasive' => false, 'durable' => true, 'autoDelete' => false, 'deadLetterQueueName' => 'dlx_queue', 'deadLetterExchangeName' => 'dlx', 'deadLetterExchangeType' => 'direct', 'deadLetterRoutingKey' => 'warning', 'messageTTL' => null ]);
Use the options array to setup your queues and exchanges. All of these are optional, defaults will apply to any parameters that are not specified here. The descriptions and defaults of these parameters are provided later in this document.
-
Create your handlers classes to handle the received messages:
// This is an example handler class namespace App\Messaging\Handlers; class AuthorHandler { public function handle($msg) { echo "Author: ".$msg->body; } public function handleError($e, $broker) { if($e instanceof InvalidInputException) { $broker->rejectMessage(); } elseif($e instanceof WhatEverException) { $broker->ackMessage(); } elseif($e instanceof WhatElseException) { $broker->nackMessage(); } else { $msg = $broker->getMessage(); if($msg->body) { // } } } }
Similarly to the above, additional functionality is also provided to the consumer's handler like
deleteExchange
,purgeQueue
anddeleteQueue
. Use these wisely and take advantage of theunused
andempty
parameters. Keep in mind that it is not recommended that an application exception be handled by manipulating the server's setup.
Register queues and handlers with php artisan bowler:make:queue {queue} {handler}
e.g. php artisan bowler:make:queue analytics_queue AnalyticsData
.
The previous command:
-
Adds
Registrator::queue('analytics_queue', 'App\Messaging\Handlers\AnalyticsDataHandler', []);
toApp\Messaging\queues.php
.If no exchange name is provided the queue name will be used as default.
The options array: if any option is specified it will override the parameter set in the command. This help to emphasize the
queues.php
file as a setup reference. -
Creates the
App\Messaging\Handlers\AnalyticsDataHandler.php
inApp\Messaging\Handlers
directory.
Now, in order to listen to any queue, run the following command from your console: php artisan bowler:consume {queue}
e.g. php artisan bowler:consume analytics_queue
.
You need to specify the queue name and any other optional parameter, if applicable to your case.
bowler:consume
complete arguments list description:
bowler:consume
queueName : The queue NAME
--N|exchangeName : The exchange NAME. Defaults to queueName.
--T|exchangeType : The exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout.
--K|bindingKeys : The consumer\'s BINDING KEYS array.
--p|passive : If set, the server will reply with Declare-Ok if the exchange and queue already exists with the same name, and raise an error if not. Defaults to 0.
--d|durable : Mark exchange and queue as DURABLE. Defaults to 1.
--D|autoDelete : Set exchange and queue to AUTO DELETE when all queues and consumers, respectively have finished using it. Defaults to 0.
--deadLetterQueueName : The dead letter queue NAME. Defaults to deadLetterExchangeName.
--deadLetterExchangeName : The dead letter exchange NAME. Defaults to deadLetterQueueName.
--deadLetterExchangeType : The dead letter exchange TYPE. Supported exchanges: fanout, direct, topic. Defaults to fanout.
--deadLetterRoutingKey : The dead letter ROUTING KEY.
--messageTTL : If set, specifies how long, in milliseconds, before a message is declared dead letter.
Consuming a none registered queue will throw
Vinelab\Bowler\Exceptions\UnregisteredQueueException
.
If you wish to handle a message based on the routing key it was published with, you can use a switch case in the handler's handle
method, like so:
public function handle($msg)
{
switch ($msg->delivery_info['routing_key']) {
case 'key 1': //do something
break;
case 'key 2': //do something else
break;
}
}
Bowler provide a default Pub/Sub implementation, where the user doesn't need to care about the setup.
In short, publish with a routingKey
and consume with matching bindingKey(s)
.
In your Producer
// Initialize a Bowler object with the rabbitmq server ip and port
$connection = new Bowler\Connection();
// Initialize a Pubisher object with a connection and a routingKey
$bowlerPublisher = new Publisher($connection);
// Publish the message and set its required routingKey
$bowlerPublisher->publish('warning', $data);
Or inject the Publisher similarly to what we've seen here.
As you might have noted, here we instantiate a Publisher
not a Producer
object. Publisher is a Producer specification, it holds the default Pub/Sub exchange setup.
publish($routingKey, $data = null);
In your Consumer
From the command line use the bowler:make:subscriber
command.
php artisan bowler:make:subscriber reporting ReportingMessage --expressive
Using the --expressive
or -E
option will make the queue name reflect that it is used for Pub/Sub. Results in reporting-pub-sub
as the generated queue name; otherwise the queue name you provided will be used.
Add the bindingKeys
array parameter to the registered queue in queues.php
like so:
Registrator::subscriber('reporting-pub-sub', 'App\Messaging\Handlers\ReportingMessageHandler', ['warning']);
Notice that it is not required to specify the exchange since it uses the default
pub-sub
exchange.
Like we've seen earlier.
From the command line use the bowler:consume
command.
php artisan bowler:consume reporting-pub-sub
The Pub/Sub implementation is meant to be used as-is. It is possible to consume all published messages to a given exchange by setting the consumer's binding keys array to ['*']
.
If no binding keys are provided a
Vinelab\Bowler\Exception\InvalidSubscriberBindingException
is thrown.
If you would like to configure manually, you can surely do so by setting up the Producer and Consumer as explained earlier.
Registrator::subscriber($queue, $className, array $bindingKeys, $exchangeName = 'pub-sub', $exchangeType = 'topic');
Similar to Pub/Sub, except you may define the exchange and messages will be distributed according to the least busy consumer (see Work Queue - Fair Dispatch).
Dispatch messages to a specific exchange with a routingKey
and consume with matching bindingKey(s)
.
// Initialize a Bowler object with the rabbitmq server ip and port
$connection = new Bowler\Connection();
// Initialize a Dispatcher object with a connection
$dispatcher = new Dispatcher($connection);
// Publish the message and set its required exchange name and routingKey
$dispatcher->dispatch('my-custom-exchange', 'warning', $data);
dispatch($exchangeName, $routingKey, $data = null, $exchangeType = 'topic')
Registering a queue consumer is the same as Pub/Sub, except the exchange name in the registration needs to match.
// catch all the cows in the "farm" exchange
Registrator::subscriber('monitoring', 'App\Messaging\Handlers\MonitoringMessageHandler', [
'*.cow.*',
], 'farm');
The above will catch all the messages in the farm
exchange that match the routing key *.cow.*
Dead lettering is solely the responsability of the consumer and part of it's queue configuration.
Enabeling dead lettering on the consumer is done through the command line using the same command that run the consumer with the dedicated optional arguments or by setting the corresponding optional parameters in the aforementioned, options array.
At least one of the --deadLetterQueueName
or --deadLetterExchangeName
options should be specified.
php artisan bowler:consume my_app_queue --deadLetterQueueName=my_app_dlq --deadLetterExchangeName=dlx --deadLetterExchangeType=direct --deadLetterRoutingKey=invalid --messageTTL=10000
If only one of the mentioned optional parameters are set, the second will default to it. Leading to the same
dlx
anddlq
name.
If you would like to avoid using Dead Lettering, you could leverage a striped down behaviour, by requeueing dead messages using $broker->rejectMessage(true)
in the queue's MessageHandler::handleError()
.
Error Handling in Bowler is limited to application exceptions.
Handler::handleError($e, $broker)
allows you to perfom action on the queue. Whether to acknowledge, nacknowledge or reject a message is up to you.
It is not recommended to alter the Rabbitmq setup in reponse to an application exception, e.g. for an InvalidInputException
to purge the queue!
In any case, if deemed necessary for the use case, it should be used with caution since you will loose all the queued messages or even worst, your exchange.
While server exceptions will be thrown. Server errors not wrapped by Bowler will be thrown as Vinelab\Bowler\Exceptions\BowlerGeneralException
.
Bowler supports application level error reporting.
To do so, the default laravel exception handler normaly located in app\Exceptions\Handler
, should implement Vinelab\Bowler\Contracts\BowlerExceptionHandler
.
ExceptionHandler::reportQueue(Exception $e, AMQPMessage $msg)
allows you to report errors as you wish. While providing the exception and the queue message itsef for maximum flexibility.
IMPORTANT: Management plugin is required to be installed in order to perform health checks.
Based on this Reliability Guide, Bowler figured that it would be beneficial to provide
a tool to check the health of connected consumers and is provided through the bowler:healthcheck:consumer
command with the following signature:
bowler:healthcheck:consumer {queueName : The queue name}
Example: php artisan bowler:healthcheck:consumer the-queue
Will return exit code 0
for success and 1
for failure along with a message why.
Bowler exposes the following lifecycle hooks:
use Vinelab\Bowler\Facades\Message;
Message::beforePublish(function (AMQPMessage $msg, string $exchangeName, $routingKey = null) {
return $msg;
});
Message::published(function (AMQPMessage $msg, string $exchangeName, $routingKey = null) {
//
});
Message::beforeConsume(function (AMQPMessage $msg, string $queueName, string $handlerClass) {
return $msg;
});
Message::consumed(function (AMQPMessage $msg, string $queueName, string $handlerClass, Ack $ack) {
//
})
By default, Bowler logs and suppress errors that occur inside the callback. You can configure this behavior via bowler.lifecycle_hooks.fail_on_error
configuration option.
If you would like to silence the Producer/Publisher to restrict it from actually sending/publishing messages to an exchange, bind it to a mock, locally in your test or globally.
Globally:
Use Vinelab\Bowler\Producer
in App\Tests\TestCase
;
Add the following to App\Tests\TestCase::createApplication()
:
$app->bind(Producer::class, function () {
return $this->createMock(Producer::class);
});
-
It is of most importance that the users of this package, take onto their responsability the mapping between exchanges and queues. And to make sure that exchanges declaration are matching both on the producer and consumer side, otherwise a
Vinelab\Bowler\Exceptions\DeclarationMismatchException
is thrown. -
The use of nameless exchanges and queues is not supported in this package. Can be reconsidered later.
- Write tests.
- Become framework agnostic.