Mini Queue, the lightweight in-memory message queue to concurrently do many (but not too many) things at once, built on top of ReactPHP.
Let's say you crawl a page and find that you need to send 100 HTTP requests to
following pages which each takes 0.2s
. You can either send them all
sequentially (taking around 20s
) or you can use
ReactPHP to concurrently request all your pages at the
same time. This works perfectly fine for a small number of operations, but
sending an excessive number of requests can either take up all resources on your
side or may get you banned by the remote side as it sees an unreasonable number
of requests from your side.
Instead, you can use this library to effectively rate limit your operations and
queue excessives ones so that not too many operations are processed at once.
This library provides a simple API that is easy to use in order to manage any
kind of async operation without having to mess with most of the low-level details.
You can use this to throttle multiple HTTP requests, database queries or pretty
much any API that already uses Promises.
- Async execution of operations - Process any number of async operations and choose how many should be handled concurrently and how many operations can be queued in-memory. Process their results as soon as responses come in. The Promise-based design provides a sane interface to working with out of order results.
- Lightweight, SOLID design - Provides a thin abstraction that is just good enough and does not get in your way. Builds on top of well-tested components and well-established concepts instead of reinventing the wheel.
- Good test coverage - Comes with an automated tests suite and is regularly tested in the real world.
Table of contents
Once installed, you can use the following code to access an HTTP webserver and send a large number of HTTP GET requests:
$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);
// load a huge array of URLs to fetch
$urls = file('urls.txt');
// each job should use the browser to GET a certain URL
// limit number of concurrent jobs here
$q = new Queue(3, null, function ($url) use ($browser) {
return $browser->get($url);
});
foreach ($urls as $url) {
$q($url)->then(function (ResponseInterface $response) use ($url) {
echo $url . ': ' . $response->getBody()->getSize() . ' bytes' . PHP_EOL;
});
}
$loop->run();
See also the examples.
The Queue
is responsible for managing your operations and ensuring not too
many operations are executed at once. It's a very simple and lightweight
in-memory implementation of the
leaky bucket algorithm.
This means that you control how many operations can be executed concurrently. If you add a job to the queue and it still below the limit, it will be executed immediately. If you keep adding new jobs to the queue and its concurrency limit is reached, it will not start a new operation and instead queue this for future execution. Once one of the pending operations complete, it will pick the next job from the queue and execute this operation.
The new Queue(int $concurrency, ?int $limit, callable $handler)
call
can be used to create a new queue instance.
You can create any number of queues, for example when you want to apply
different limits to different kinds of operations.
The $concurrency
parameter sets a new soft limit for the maximum number
of jobs to handle concurrently. Finding a good concurrency limit depends
on your particular use case. It's common to limit concurrency to a rather
small value, as doing more than a dozen of things at once may easily
overwhelm the receiving side.
The $limit
parameter sets a new hard limit on how many jobs may be
outstanding (kept in memory) at once. Depending on your particular use
case, it's usually safe to keep a few hundreds or thousands of jobs in
memory. If you do not want to apply an upper limit, you can pass a null
value which is semantically more meaningful than passing a big number.
// handle up to 10 jobs concurrently, but keep no more than 1000 in memory
$q = new Queue(10, 1000, $handler);
// handle up to 10 jobs concurrently, do not limit queue size
$q = new Queue(10, null, $handler);
// handle up to 10 jobs concurrently, reject all further jobs
$q = new Queue(10, 10, $handler);
The $handler
parameter must be a valid callable that accepts your job
parameters, invokes the appropriate operation and returns a Promise as a
placeholder for its future result.
// using a Closure as handler is usually recommended
$q = new Queue(10, null, function ($url) use ($browser) {
return $browser->get($url);
});
// accepts any callable, so PHP's array notation is also supported
$q = new Queue(10, null, array($browser, 'get'));
This library works under the assumption that you want to concurrently handle async operations that use a Promise-based API.
The demonstration purposes, the examples in this documentation use the async HTTP client clue/reactphp-buzz, but you may use any Promise-based API with this project. Its API can be used like this:
$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);
$promise = $browser->get($url);
If you wrap this in a Queue
instance as given above, this code will look
like this:
$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);
$q = new Queue(10, null, function ($url) use ($browser) {
return $browser->get($url);
});
$promise = $q($url);
The $q
instance is invokable, so that invoking $q(...$args)
will
actually be forwarded as $browser->get(...$args)
as given in the
$handler
argument when concurrency is still below limits.
Each operation is expected to be async (non-blocking), so you may actually
invoke multiple operations concurrently (send multiple requests in parallel).
The $handler
is responsible for responding to each request with a resolution
value, the order is not guaranteed.
These operations use a Promise-based
interface that makes it easy to react to when an operation is completed (i.e.
either successfully fulfilled or rejected with an error):
$promise->then(
function ($result) {
var_dump('Result received', $result);
},
function (Exception $error) {
var_dump('There was an error', $error->getMessage());
}
);
Each operation may take some time to complete, but due to its async nature you can actually start any number of (queued) operations. Once the concurrency limit is reached, this invocation will simply be queued and this will return a pending promise which will start the actual operation once another operation is completed. This means that this is handled entirely transparently and you do not need to worry about this concurrency limit yourself.
If this looks strange to you, you can also use the more traditional blocking API.
The returned Promise is implemented in such a way that it can be cancelled when it is still pending. Cancelling a pending operation will invoke its cancellation handler which is responsible for rejecting its value with an Exception and cleaning up any underlying resources.
$promise = $q($url);
$loop->addTimer(2.0, function () use ($promise) {
$promise->cancel();
});
Similarly, cancelling an operation that is queued and has not yet been started will be rejected without ever starting the operation.
By default, this library does not limit how long a single operation can take, so that the resulting promise may stay pending for a long time. Many use cases involve some kind of "timeout" logic so that an operation is cancelled after a certain threshold is reached.
You can simply use cancellation as in the previous chapter or you may want to look into using react/promise-timer which helps taking care of this through a simple API.
The resulting code with timeouts applied look something like this:
use React\Promise\Timer;
$q = new Queue(10, null, function ($uri) use ($browser, $loop) {
return Timer\timeout($browser->get($uri), 2.0, $loop);
});
$promise = $q($uri);
The resulting promise can be consumed as usual and the above code will ensure that execution of this operation can not take longer than the given timeout (i.e. after it is actually started). In particular, note how this differs from applying a timeout to the resulting promise. The following code will ensure that the total time for queuing and executing this operation can not take longer than the given timeout:
// usually not recommended
$promise = Timer\timeout($q($url), 2.0, $loop);
Please refer to react/promise-timer for more details.
The static all(int $concurrency, array $jobs, callable $handler): PromiseInterface<mixed[]>
method can be used to
concurrently process all given jobs through the given $handler
.
This is a convenience method which uses the Queue
internally to
schedule all jobs while limiting concurrency to ensure no more than
$concurrency
jobs ever run at once. It will return a promise which
resolves with the results of all jobs on success.
$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);
$promise = Queue::all(3, $urls, function ($url) use ($browser) {
return $browser->get($url);
});
$promise->then(function (array $responses) {
echo 'All ' . count($responses) . ' successful!' . PHP_EOL;
});
If either of the jobs fail, it will reject the resulting promise and will
try to cancel all outstanding jobs. Similarly, calling cancel()
on the
resulting promise will try to cancel all outstanding jobs. See
promises and cancellation for details.
The $concurrency
parameter sets a new soft limit for the maximum number
of jobs to handle concurrently. Finding a good concurrency limit depends
on your particular use case. It's common to limit concurrency to a rather
small value, as doing more than a dozen of things at once may easily
overwhelm the receiving side. Using a 1
value will ensure that all jobs
are processed one after another, effectively creating a "waterfall" of
jobs. Using a value less than 1 will reject with an
InvalidArgumentException
without processing any jobs.
// handle up to 10 jobs concurrently
$promise = Queue::all(10, $jobs, $handler);
// handle each job after another without concurrency (waterfall)
$promise = Queue::all(1, $jobs, $handler);
The $jobs
parameter must be an array with all jobs to process. Each
value in this array will be passed to the $handler
to start one job.
The array keys will be preserved in the resulting array, while the array
values will be replaced with the job results as returned by the
$handler
. If this array is empty, this method will resolve with an
empty array without processing any jobs.
The $handler
parameter must be a valid callable that accepts your job
parameters, invokes the appropriate operation and returns a Promise as a
placeholder for its future result. If the given argument is not a valid
callable, this method will reject with an InvalidArgumentException
without processing any jobs.
// using a Closure as handler is usually recommended
$promise = Queue::all(10, $jobs, function ($url) use ($browser) {
return $browser->get($url);
});
// accepts any callable, so PHP's array notation is also supported
$promise = Queue::all(10, $jobs, array($browser, 'get'));
Keep in mind that returning an array of response messages means that the whole response body has to be kept in memory.
The static any(int $concurrency, array $jobs, callable $handler): PromiseInterface<mixed>
method can be used to
concurrently process given jobs through the given $handler
and resolve
with first resolution value.
This is a convenience method which uses the Queue
internally to
schedule all jobs while limiting concurrency to ensure no more than
$concurrency
jobs ever run at once. It will return a promise which
resolves with the result of the first job on success and will then try
to cancel()
all outstanding jobs.
$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);
$promise = Queue::any(3, $urls, function ($url) use ($browser) {
return $browser->get($url);
});
$promise->then(function (ResponseInterface $response) {
echo 'First response: ' . $response->getBody() . PHP_EOL;
});
If all of the jobs fail, it will reject the resulting promise. Similarly,
calling cancel()
on the resulting promise will try to cancel all
outstanding jobs. See promises and
cancellation for details.
The $concurrency
parameter sets a new soft limit for the maximum number
of jobs to handle concurrently. Finding a good concurrency limit depends
on your particular use case. It's common to limit concurrency to a rather
small value, as doing more than a dozen of things at once may easily
overwhelm the receiving side. Using a 1
value will ensure that all jobs
are processed one after another, effectively creating a "waterfall" of
jobs. Using a value less than 1 will reject with an
InvalidArgumentException
without processing any jobs.
// handle up to 10 jobs concurrently
$promise = Queue::any(10, $jobs, $handler);
// handle each job after another without concurrency (waterfall)
$promise = Queue::any(1, $jobs, $handler);
The $jobs
parameter must be an array with all jobs to process. Each
value in this array will be passed to the $handler
to start one job.
The array keys have no effect, the promise will simply resolve with the
job results of the first successful job as returned by the $handler
.
If this array is empty, this method will reject without processing any
jobs.
The $handler
parameter must be a valid callable that accepts your job
parameters, invokes the appropriate operation and returns a Promise as a
placeholder for its future result. If the given argument is not a valid
callable, this method will reject with an InvalidArgumentExceptionn
without processing any jobs.
// using a Closure as handler is usually recommended
$promise = Queue::any(10, $jobs, function ($url) use ($browser) {
return $browser->get($url);
});
// accepts any callable, so PHP's array notation is also supported
$promise = Queue::any(10, $jobs, array($browser, 'get'));
As stated above, this library provides you a powerful, async API by default. If, however, you want to integrate this into your traditional, blocking environment, you may want to look into also using clue/reactphp-block.
The resulting blocking code that awaits a number of concurrent HTTP requests could look something like this:
use Clue\React\Block;
$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);
$promise = Queue::all(3, $urls, function ($url) use ($browser) {
return $browser->get($url);
});
try {
$responses = Block\await($promise, $loop);
// responses successfully received
} catch (Exception $e) {
// an error occured while performing the requests
}
Similarly, you can also wrap this in a function to provide a simple API and hide all the async details from the outside:
/**
* Concurrently downloads all the given URIs
*
* @param string[] $uris list of URIs to download
* @return ResponseInterface[] map with a response object for each URI
* @throws Exception if any of the URIs can not be downloaded
*/
function download(array $uris)
{
$loop = React\EventLoop\Factory::create();
$browser = new Clue\React\Buzz\Browser($loop);
$promise = Queue::all(3, $uris, function ($uri) use ($browser) {
return $browser->get($uri);
});
return Clue\React\Block\await($promise, $loop);
}
Please refer to clue/reactphp-block for more details.
Keep in mind that returning an array of response messages means that the whole response body has to be kept in memory.
The recommended way to install this library is through Composer. New to Composer?
This project follows SemVer. This will install the latest supported version:
$ composer require clue/mq-react:^1.2
See also the CHANGELOG for details about version upgrades.
This project aims to run on any platform and thus does not require any PHP extensions and supports running on legacy PHP 5.3 through current PHP 7+. It's highly recommended to use PHP 7+ for this project.
To run the test suite, you first need to clone this repo and then install all dependencies through Composer:
$ composer install
To run the test suite, go to the project root and run:
$ php vendor/bin/phpunit
This project is released under the permissive MIT license.
I'd like to thank Bergfreunde GmbH, a German online retailer for Outdoor Gear & Clothing, for sponsoring the first release! 🎉 Thanks to sponsors like this, who understand the importance of open source development, I can justify spending time and focus on open source development instead of traditional paid work.
Did you know that I offer custom development services and issuing invoices for sponsorships of releases and for contributions? Contact me (@clue) for details.