Bridge to use Symfony Messenger on AWS Lambda with Bref.
This guide assumes that:
- Symfony is installed
- Symfony Messenger is installed
- Bref is installed and configured to deploy Symfony
First, install this package:
composer require bref/symfony-messenger
Next, register the bundle in config/bundles.php
:
return [
// ...
Bref\Messenger\BrefMessengerBundle::class => ['all' => true],
];
We need to create a new Lambda handler that receives all events from AWS. Lets
create bin/consumer.php
with the following contents:
<?php declare(strict_types=1);
use Bref\Messenger\Service\BrefWorker;
require dirname(__DIR__) . '/config/bootstrap.php';
lambda(function ($event) {
$kernel = new \App\Kernel($_SERVER['APP_ENV'], (bool) $_SERVER['APP_DEBUG']);
$kernel->boot();
$worker = $kernel->getContainer()->get(BrefWorker::class);
$worker->consumeLambdaEvent($event);
});
Now there is time to choose you the events you are interested in.
This bundle has Symfony Messenger Transports to publish messages and Consumers to receive Lambda events from AWS. All Transports are configurable with a DSN and the sections below will show you some examples. They will all follow the normal Symfony pattern:
# config/packages/messenger.yaml
framework:
messenger:
transports:
async:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
routing:
'App\Message\MyMessage': async
The SQS service is a queue that works similar to RabbitMQ. The AWS console lets you create a SQS queue as a "normal queue" or a "FIFO queue".
Note that environment variables
AWS_REGION
,AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
do always exist on Lambda. The AWS client will readAWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
automatically.
# config/packages/messenger.yaml
framework:
messenger:
transports:
my_sqs:
dsn: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
bref_messenger:
consumers:
my_sqs:
service: 'Bref\Messenger\Service\Sqs\SqsConsumer'
services:
Aws\Sqs\SqsClient:
factory: [Aws\Sqs\SqsClient, factory]
arguments:
- region: '%env(AWS_REGION)%'
version: '2012-11-05'
The FIFO queue guarantees exactly once delivery. To differentiate messages we must either configure the FIFO queue to look at a specific parameter in the message, or let AWS calculate a hash over the message body. The latter is simpler and it is enabled by using "Content-Based Deduplication".
We also need to specify what message group we are using. It can be your applications reverse hostname.
# config/packages/messenger.yaml
framework:
messenger:
transports:
my_sqs_fifo:
dsn: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue.fifo'
options: { message_group_id: com_example }
bref_messenger:
consumers:
my_sqs_fifo:
service: 'Bref\Messenger\Service\Sqs\SqsConsumer'
services:
Aws\Sqs\SqsClient:
factory: [Aws\Sqs\SqsClient, factory]
arguments:
- region: '%env(AWS_REGION)%'
version: '2012-11-05'
AWS SNS is "notification" instead of "queues". Messages may not arrive in the same order as sent and they might arrive all at once.
Note that environment variables
AWS_REGION
,AWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
do always exist on Lambda. The AWS client will readAWS_ACCESS_KEY_ID
andAWS_SECRET_ACCESS_KEY
automatically.
# config/packages/messenger.yaml
framework:
messenger:
transports:
my_sns:
dsn: 'sns://arn:aws:sns:us-east-1:1234567890:foobar'
bref_messenger:
consumers:
my_sns:
service: 'Bref\Messenger\Service\Sns\SnsConsumer'
services:
Aws\Sns\SnsClient:
factory: [Aws\Sns\SnsClient, factory]
arguments:
- region: '%env(AWS_REGION)%'
version: '2010-03-31'
The S3 integration is only a Consumer. That means that we will not be able to publish Symfony Messenger messages on S3 but we can get notified when a file is uploaded/changed.
Since the S3 consumer is not connected to any transport we will use no_transport: true
to avoid getting a "Missing transport exception" thrown when building the container.
# config/packages/messenger.yaml
bref_messenger:
consumers:
s3:
service: 'Bref\Messenger\Service\S3\S3Consumer'
no_transport: true
namespace App\Message;
use Bref\Messenger\Message\S3Event;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;
class S3Handler implements MessageHandlerInterface
{
private $logger;
public function __construct(LoggerInterface $logger)
{
$this->logger = $logger;
}
public function __invoke(S3Event $event)
{
$this->logger->alert('Got S3 event');
$this->logger->alert(json_encode($event->getRecord()));
}
}
The Serverless configuration is same of all kind Lambda events. You should just make sure to configure the parameters correctly. You may also add as many events as you want. Same type or different types. Mix all you want.
# serverless.yml
functions:
worker:
handler: bin/consumer.php
timeout: 20 # in seconds
reservedConcurrency: 5 # max. 5 messages processed in parallel
layers:
- ${bref:layer.php-74}
events:
- sqs:
arn: arn:aws:sqs:us-east-1:1234567890:my_sqs_queue
# Only 1 item at a time to simplify error handling
batchSize: 1
- sqs:
arn: arn:aws:sqs:us-east-1:1234567890:my_sqs_queue.fifo
batchSize: 1
- sns:
arn: arn:aws:sns:us-east-1:1234567890:my_sns_topic
- s3:
bucket: my-test-bucket
event: s3:ObjectCreated:*
existing: true
On each consumer you can choose to let Symfony handle failures as described in the documentation. Example:
# config/packages/messenger.yaml
framework:
messenger:
transports:
failed: 'doctrine://default?queue_name=failed'
workqueue:
dsn: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
retry_strategy:
max_retries: 3
# milliseconds delay
delay: 1000
multiplier: 2
max_delay: 60
bref_messenger:
consumers:
my_sqs:
service: 'Bref\Messenger\Service\Sqs\SqsConsumer'
use_symfony_retry_strategies: true # default value
# ...
The delay is only supported on SQS "normal queue". If you are using SNS or SQS FIFO you should use the failure queue directly.
# config/packages/messenger.yaml
framework:
messenger:
transports:
failed: 'doctrine://default?queue_name=failed'
workqueue:
dsn: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
bref_messenger:
consumers:
my_sqs:
service: 'Bref\Messenger\Service\Sqs\SqsConsumer'
use_symfony_retry_strategies: true # default value
# ...
Make sure you re-run the failure queue time to time:
# serverless.yml
functions:
website:
# ...
consumer:
# ...
console:
handler: bin/console
Timeout: 120 # in seconds
layers:
- ${bref:layer.php-74}
- ${bref:layer.console}
events:
- schedule:
rate: rate(30 minutes)
input:
cli: messenger:consume failed --time-limit=60 --limit=50
This section is really raw, feel free to contribute to improve it.
Alternative to the "Symfony way" you may allow AWS infrastructure to handle errors:
# config/packages/messenger.yaml
framework:
messenger:
transports:
workqueue:
dsn: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
bref_messenger:
consumers:
my_sqs:
service: 'Bref\Messenger\Service\Sqs\SqsConsumer'
use_symfony_retry_strategies: false
# ...
# serverless.yml
queue:
Type: AWS::SQS::Queue
Properties:
# This needs to be at least 6 times the lambda function's timeout
# See https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
VisibilityTimeout: '960'
RedrivePolicy:
deadLetterTargetArn: !GetAtt DeadLetterQueue.Arn
# Jobs will be retried 5 times
# The number needs to be at least 5 per https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html
maxReceiveCount: 5
# The dead letter queue is a SQS queue that receives messages that failed to be processed
DeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
# Messages are stored up to 14 days (the max)
MessageRetentionPeriod: 1209600
Each consumer can be configured with a bus and serializer. The default bus is the
RoutableMessageBus
which will automatically find the correct bus depending on your transport name.
You may provide any service that implements Symfony\Component\Messenger\MessageBusInterface
.
# config/packages/messenger.yaml
bref_messenger:
consumers:
my_sqs:
service: 'Bref\Messenger\Service\Sqs\SqsConsumer'
bus: 'messenger.bus.command'
# ...
Same thing with the Serializer. You may want to use Happyr message serializer for a more reliable API when sending messages between applications. You need to add the serializer on both the transport and the consumer.
# config/packages/messenger.yaml
framework:
messenger:
transports:
workqueue:
dsn: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
serializer: 'Happyr\MessageSerializer\Serializer'
bref_messenger:
consumers:
my_sqs:
service: 'Bref\Messenger\Service\Sqs\SqsConsumer'
serializer: 'Happyr\MessageSerializer\Serializer'
# ...
If you want to do your own implementation of a consumer you need two things:
- Make sure the "type" can be read from the Lambda event
- An implementation of
Bref\Messenger\Service\Consumer
The Bref\Messenger\Service\TypeProvider
is pretty good at finding the type since
all(?) AWS Lambda events has a EventSource
property. But you may want to override
this with your own logic:
namespace App\Service;
final class MyTypeProvider implements \Bref\Messenger\Service\TypeResolver
{
public function getType(array $event): ?string
{
if (/*...*/) {
return 'my_type';
}
return null;
}
}
services:
App\Service\MyTypeProvider:
tags:
- { name: 'bref_messenger.type_provider' }
When you create your consumer, you can either use the Bref\Messenger\Service\AbstractConsumer
or start your own implementation. If the AbstractConsumer
is used, you cannot change the first
5 constructor arguments.
If you write your own implementation you may do as you want.
namespace App\Service;
final class MyConsumer implements \Bref\Messenger\Service\Consumer
{
public function consume(string $type, $event): void
{
// ...
$envelope = $this->serializer->decode(['body' => /* ... */ ]);
// ...
}
public static function supportedTypes(): array
{
return ['my_type'];
}
}
bref_messenger:
consumers:
my_special_consumer:
service: 'App\Service\MyConsumer'
You can of course use as many consumer as you want. Sky is the limit!
framework:
messenger:
buses:
messenger.bus.command:
middleware:
- validation
- doctrine_transaction
messenger.bus.event:
default_middleware: allow_no_handlers
middleware:
- validation
- doctrine_transaction
transports:
failed: 'doctrine://default?queue_name=failed'
sync: 'sync://'
workqueue: 'https://sqs.us-east-1.amazonaws.com/123456789/my-queue'
notification: 'sns://arn:aws:sns:us-east-1:1234567890:foobar'
routing:
'App\Message\Ping': workqueue
'App\Message\Pong': notification
bref_messenger:
consumers:
workqueue:
service: 'Bref\Messenger\Service\Sqs\SqsConsumer'
notification:
service: 'Bref\Messenger\Service\Sns\SnsConsumer'
s3:
service: 'Bref\Messenger\Service\S3\S3Consumer'
bus_service: 'messenger.bus.command'
no_transport: true