The Amp\Thread
library parallelizes synchronous PHP function calls to worker thread pools in
non-blocking applications. The library dispatches blocking calls to worker threads where they can
execute in parallel and returns results asynchronously upon completion.
Problem Domain
PHP has a vast catalog of synchronous libraries and extensions but it's generally difficult to find libs for use inside non-blocking event loops. Beyond this limitation there are common tasks (like filesystem IO) which don't play nice with the non-blocking paradigm. Amp exposes threaded concurrency in a non-blocking way to execute discrete tasks in worker threads.
NOTE: This library is not intended for use in PHP web SAPI environments. It doesn't make much sense to fire up a thread pool and socket streams for inter-thread communication on every request in a web SAPI environment.
Amp\Thread
is designed for use in CLI applications.
Example
<?php
function slowAddition($x, $y) {
sleep(1);
return $x + $y;
}
try {
$dispatcher = new Amp\Thread\Dispatcher;
$a = $dispatcher->call('slowAddition', 1, 5);
$b = $dispatcher->call('slowAddition', 10, 10);
$c = $dispatcher->call('slowAddition', 11, 31);
// Combine these three promises into a single promise that
// resolves when all of the individual operations complete
$comboPromise = Amp\all([$a, $b, $c]);
// Our three calls will complete in one second instead of
// three because they all run at the same time
list($a, $b, $c) = $comboPromise->wait();
var_dump($a, $b, $c);
/*
int(6)
int(20)
int(42)
*/
} catch (Exception $e) {
printf("Something went wrong:\n\n%s\n", $e->getMessage());
}
- Expose threaded multiprocessing inside event-driven non-blocking applications;
- Build all components using SOLID, readable and unit-tested code.
- PHP 5.5+ You'll need PHP.
- pecl/pthreads The pthreads extension (windows .DLLs here)
- amphp The amp async multitasking framework
$ git clone git@github.com:amphp/thread.git
$ cd thread
$ composer install
Intro
Basic Usage
- Basic Calls
- Userland Functions
- Static Methods
- Magic Calls
- Error Handling
- Parameters and Returns
- Task Timeouts
- Pool Size
- Thread Execution Limits
- Pthreads Context Flags
Advanced Usage
- Threaded Tasks
- Task Progress Updates
- Magic Tasks
- Class Autoloading and Composer
- Naive Wait Parallelization
- Parallelization Combinators
NOTE: Because threads use
Amp
concurrency primitives it's possible to execute tasks in parallel with zero knowledge of event loops. You can find out more in the Naive Wait Parallelization and Parallelization Combinators sections.
Executing code inside an event loop allows us to use non-blocking libraries to perform multiple IO operations at the same time. Instead of waiting for each individual operation to complete the event loop assumes program flow and informs us when our tasks finish or actionable events occur. This paradigm allows programs to execute other instructions when they would otherwise waste cycle waiting for slow IO operations to complete. The non-blocking approach is particularly useful for scalable network applications and servers where the naive thread-per-connection approach is untenable.
Unfortunately, robust applications generally require synchronous functionality and/or filesystem operations that can't behave in a non-blocking manner. Amp was created to provide non-blocking applications access to the full range of synchronous PHP functionality without blocking the main event loop.
NOTE: It's critical that any non-blocking libs in your application use the same event loop scheduler. The Amp dispatcher uses the
Amp
event reactor for scheduling.
Because Amp executes inside an event loop, you'll see all of the following examples create a new
event reactor instance to kick things off. Once the event reactor is started it assumes program
control and will not return control until your application calls Reactor::stop()
.
Learn more about the Amp event reactor.
The simplest way to use the thread library is to dispatch calls to global functions:
<?php
// Everything happens inside an event reactor loop
(new Amp\NativeReactor)->run(function($reactor) {
// Create our task dispatcher
$dispatcher = new Amp\Thread\Dispatcher($reactor);
try {
// Invoke strlen('zanzibar') in a worker thread.
// Yield the resulting Promise to avoid callback hell.
$result = (yield $dispatcher->call('strlen', 'zanzibar!'));
printf("Woot! strlen('zanzibar') === %d", $result);
} catch (Exception $e) {
printf("Something went terribly wrong: %s\n", $e);
} finally {
// Stop the event loop so we don't sit around forever
// after our result comes back
$reactor->stop();
}
});
The above example outputs the following to our console:
Woot! strlen('zanzibar') === 8
Obviously, the strlen
call here is a spurious use of threaded concurrency; remember that it only
ever makes sense to dispatch work to a thread if the processing overhead of sending the call and
receiving the result is outweighed by the time that would otherwise be spent waiting for the result.
A more useful example demonstrates retrieving the contents of a filesystem resource:
<?php
(new Amp\NativeReactor)->run(function($reactor) {
try {
$dispatcher = new Amp\Thread\Dispatcher($reactor);
$str = (yield $dispatcher->call('file_get_contents', '/path/to/file'));
var_dump($str);
} catch (Exception $e) {
printf("Something went terribly wrong: %s\n", $e->getMessage());
} finally {
$reactor->stop();
}
});
The above code retrieves the contents of the file at /path/to/file
in a worker thread and dumps
the result in our main thread upon completion.
We aren't limited to native functions. The Amp\Thread\Dispatcher
can also dispatch calls to userland
functions ...
<?php
function myMultiply($x, $y) {
return $x * $y;
}
(new Amp\NativeReactor)->run(function($reactor) {
try {
$dispatcher = new Amp\Thread\Dispatcher($reactor);
var_dump(yield $dispatcher->call('myMultiply', 6, 7));
} catch (Exception $e) {
printf("Something went terribly wrong: %s\n", $e->getMessage());
} finally {
$reactor->stop();
}
});
The above code results in the following output:
int(42)
The Dispatcher::call()
method can accept any callable string, so we aren't limited to function
names. We can also dispatch calls to static class methods:
<?php
class MyMultiplier {
public static function multiply($x, $y) {
return $x * $y;
}
}
Amp\run(function() {
try {
$dispatcher = new Amp\Thread\Dispatcher;
var_dump(yield $dispatcher->call('MyMultiplier::multiply', 6, 7));
} catch (Exception $e) {
printf("Something went terribly wrong: %s\n", $e->getMessage());
} finally {
Amp\stop();
}
});
The above code results in the following output:
int(42)
IMPORTANT: In this example we've hardcoded the
MyMultiplier
class definition in the code. There is no class autoloading employed. There is no way forpecl/pthreads
to inherit globally registered autoloaders from the main thread. If you require autoloading in your worker threads you MUST dispatch aThreaded
task to define autoloader function(s) in your workers as demonstrated in the Class Autoloading section of this guide.
Dispatchers take advantage of the magic __call()
method to simplify calls to functions in the global
namespace. Consider:
<?php
Amp\run(function() {
try {
$dispatcher = new Amp\Thread\Dispatcher;
$fileHandle = (yield $dispatcher->fopen('/path/to/file', 'r'));
var_dump($fileHandle);
} catch (Exception $e) {
printf("Something went wrong: %s", $error->getMessage());
} finally {
Amp\stop();
};
});
The above code opens a read-only file handle to the specified file and returns the result in our main thread upon completion.
You may have noticed that our examples to this point have not returned results directly. Instead,
they return an instance of Amp\Promise
. These monadic placeholder objects allow us to distinguish
between successful execution results from our worker threads and errors. When using generators to
yield control any exceptions encountered will be thrown back into the generator and must be caught
or they will bubble up and crash our program.
Uncaught Exception
<?php
function myThrowingFunction() {
throw new \RuntimeException('oh noes!!!');
}
Amp\run(function() {
try {
$dispatcher = new Amp\Thread\Dispatcher;
var_dump(yield $dispatcher->myThrowingFunction());
} catch (Exception $e) {
printf("Function threw %s as expected: %s\n", get_class($e), $e->getMessage());
} finally {
Amp\stop();
}
});
Fatal Error
In the following example we purposefully do something that will generate a fatal error in our worker thread. Our dispatcher seamlessly recovers from the fatal condition on its own; there is no need to restart the thread pool and our main thread reports the error as if it were a normal exception.
<?php
function myFatalFunction() {
$nonexistentObject->nonexistentMethod(); // fatal
}
Amp\run(function() {
try {
$dispatcher = new Amp\Thread\Dispatcher;
(yield $dispatcher->myFatalFunction());
} catch (Exception $e) {
printf("Function threw %s as expected: %s\n", get_class($e), $e->getMessage());
} finally {
Amp\stop();
}
});
While Amp tries as much as possible to hide the implementation details of the underlying pthreads extension, your parallel operations are still bound by the constraints and limitations of pthreads. The primary limitations arising from this condition center around the raw parameters and return values in your parallelized calls. The main thing you need to remember is:
IMPORTANT: All individual parameters and returns (with the exception of resources) will be serialized by pthreads for transport between the main thread and worker threads.
This condition cannot be overemphasized. The individual parameters MUST either be capable of surviving serialization or they MUST be resource primitives. There is one major pitfall with this constraint:
GOTCHYA: Be careful not to wrap resources inside arrays as the array will be serialized and the individual resource elements will not survive the trip over a thread boundary.
As long as you pass/return resource parameters directly (as opposed to wrapped inside arrays or objects) your parallel functions should "just work."
NOTE: Relying on timeouts is almost always a poor design decision. You're much better served to solve the underlying problem that causes slow execution in your dispatched calls/tasks.
Amp automatically times out tasks exceeding the (configurable) maximum allowed run-time. We can customize this setting as shown in the following example:
<?php
use Amp\Thread\Dispatcher;
use Amp\Thread\TimeoutException;
Amp\run(function() {
$dispatcher = new Dispatcher;
// Only use one worker so our thread pool acts like a FIFO job queue
$dispatcher->setOption(Dispatcher::OPT_POOL_SIZE_MAX, 1);
// Limit per-call execution time to 2 seconds
$dispatcher->setOption(Dispatcher::OPT_TASK_TIMEOUT, 2);
try {
// This task will timeout after two seconds
(yield $dispatcher->sleep(9999));
} catch (TimeoutException $e) {
printf("Our task timed out: %s\n", $e->getMessage());
} finally {
Amp\stop();
}
});
You may have noticed that in the above timeout example we explicity set a max pool size option. The effect of this setting should be obvious: it controls how many worker threads we spawn to handle task dispatches. An example:
<?php
$reactor = new Amp\NativeReactor;
$dispatcher = new Amp\Thread\Dispatcher($reactor);
$dispatcher->setOption(Amp\Thread\Dispatcher::OPT_POOL_SIZE_MAX, 16);
By default the Amp\Thread\Dispatcher
will only spawn a single worker thread. Each time a call is
dispatched a new thread will be spawned if all existing workers in the pool are busy (subject to the
configured max size). The default OPT_POOL_SIZE_MAX
setting is 8. If no workers are available and
the pool size is maxed calls are queued and dispatched as workers become available.
NOTE:: Idle worker threads are periodically unloaded to avoid holding open threads unnecessarily.
Dispatchers keep a minimum number of worker threads open at all times (even when idle). By default the minimum number of threads kept open is 1. This value may be changed as follows:
<?php
$reactor = new Amp\NativeReactor;
$dispatcher = new Amp\Thread\Dispatcher($reactor);
$dispatcher->setOption(Amp\Thread\Dispatcher::OPT_POOL_SIZE_MIN, 4);
In theory we shouldn't have to worry about sloppy code or extensions playing fast and loose with memory resources. However in the real world this may not always be an option. Amp makes provision for these scenarios by exposing a configurable limit setting to control how many dispatches a worker thread will accept before being respawned to clean up any outstanding garbage. If you wish to modify this setting simply set the relevant option:
<?php
use Amp\Thread\Dispatcher;
$dispatcher = Dispatcher;
$dispatcher->setOption(Dispatcher::OPT_EXEC_LIMIT, 1024); // 1024 is the default
Users who wish to remove the execution limit you may set the value to -1
as shown here:
<?php
use Amp\Thread\Dispatcher;
$dispatcher = Dispatcher;
$dispatcher->setOption(Dispatcher::OPT_EXEC_LIMIT, -1);
Users can control the context inheritance mask used to start worker threads by setting thread start flags as shown here:
<?php
use Amp\Thread\Dispatcher;
$dispatcher = new Dispatcher;
$dispatcher->setOption(Dispatcher::OPT_THREAD_FLAGS, PTHREADS_INHERIT_NONE);
The full list of available flags can be found in the relevant pthreads documentation page.
While Amp abstracts much of the underlying pthreads functionality there are times when low-level
access is useful. For these scenarios Amp allows the specification of "tasks" extending pthreads
Threaded
. Threadeds allow users to specify arbitrary code in the main
thread and use it for execution in worker threads.
NOTE: All
Threaded
classes MUST (per pthreads) specify the abstractThreaded::run()
method. YourThreaded
object'srun()
method is the routine that will execute in the worker thread. In order to avoid errors yourThreaded::run()
must call the worker thread'sresolve()
method as shown in the example below. This is how Amp knows what to return from the threaded task.
Instances of your custom Threaded
may then be passed to the Dispatcher::execute()
method
for processing.
<?php
MyTask extends \Threaded {
public function run() {
$result = strlen('zanzibar');
// Custom tasks must register their results using either
// Amp\Thread\Thread::SUCCESS or Amp\Thread\Thread::FAILURE:
$this->worker->resolve(Amp\Thread\Thread::SUCCESS, $result);
}
}
Amp\run(function() {
try {
$dispatcher = new Amp\Thread\Dispatcher;
$len = (yield $dispatcher->execute(new MyTask)); // <-- our custom task
var_dump($len);
} catch (Exception $e) {
printf("Something went terribly wrong: %s\n", $e->getMessage());
} finally {
Amp\stop();
}
});
Because the Amp
concurrency primitives support incremental progress updates we can expose this
functionality in our custom Threaded
tasks. In the same way we use the worker's resolve()
method to indicate task completion we can use progress()
to notify our main thread incrementally
before the task actually completes. Below we show how to send/receive progress updates in our
threaded tasks.
<?php
MyIncrementalTask extends \Threaded {
public function run() {
$this->worker->progress(1);
sleep(1);
$this->worker->progress(2);
sleep(1);
$this->worker->progress(3);
sleep(1);
$this->worker->resolve(Amp\Thread::SUCCESS, 42);
}
}
$dispatcher = new Amp\Thread\Dispatcher;
$task = new MyIncrementalTask;
$promise = $dispatcher->execute($task);
// Watch for progress updates from our task
$promise->watch(function($updateData) {
printf("Progress update: %s\n", $updateData);
});
// Wait for the final task result
printf("Final task result: %s\n", $promise->wait());
The above code will output the following:
Progress update: 1
Progress update: 2
Progress update: 3
Final task result: 42
Dispatcher
implementations delegate the magic __invoke
function to the
Dispatcher::execute()
method. This provides a simple shortcut method for execute()
calls:
<?php
class MyTask extends \Threaded {
public function run() {
// do something here
}
}
(new Amp\NativeReactor)->run(function($reactor) {
$dispatcher = new Amp\Thread\Dispatcher($reactor);
$promise = $dispatcher(new MyTask);
$promise->when(function($error, $result) use ($reactor) {
assert($error === null);
assert($result === 8);
$reactor->stop();
});
});
There is no way for pthreads workers to inherit global autoload settings. As a result, if calls
or task executions require class autoloading users must make provisions to register autoload
functions in workers prior to dispatching tasks. This presents the problem of re-registering these
settings each time a worker thread is respawned. Amp resolves this issue by allowing applications to
register Threaded
tasks to send workers whenever they're spawned.
Consider the following example in which we define our own autoload task and register it for inclusion when workers are spawned:
<?php
class MyAutoloadTask extends \Threaded {
public function run() {
spl_autoload_register(function($class) {
if (0 === strpos($class, 'MyNamespace\\')) {
$class = str_replace('\\', '/', $class);
$file = __DIR__ . "/src/$class.php";
if (file_exists($file)) {
require $file;
}
}
});
}
}
$reactor = new Amp\NativeReactor;
$dispatcher = new Amp\Thread\Dispatcher($reactor);
$dispatcher->addStartTask(new MyAutoloadTask);
Now all our worker threads register class autoloaders prior to receiving tasks or calls. Note that
"start tasks" are stored in an SplObjectStorage
instance, so repeatedly adding the same instance
will have no effect. After adding a start task you may also remove it in the future as shown here:
$myStartTask = new MyAutoloadTask;
$reactor = new Amp\NativeReactor;
$dispatcher = new Amp\Thread\Dispatcher($reactor);
$dispatcher->addStartTask($myStartTask);
// ... //
$dispatcher->removeStartTask($myStartTask);
Composer
Using a generated autoloader from composer is no different from registering any other autoloader:
<?php
class MyComposerAutoloadTask extends \Threaded {
public function run() {
require '/path/to/vendor/autoload.php';
}
}
$dispatcher = new Amp\Thread\Dispatcher;
$dispatcher->addStartTask(new MyComposerAutoloadTask);
That's it!
Because threads use the Amp
concurrency primitives library, users don't actually need any
understanding of the underlying non-blocking event loop to execute Amp tasks in parallel. By calling
wait()
on any promise we can block code execution indefinitely until the promised value resolves:
<?php
try {
$dispatcher = new Amp\Thread\Dispatcher;
// Dispatch a threaded task
$promise = $dispatcher->call('strlen', 'zanzibar');
// Synchronously Block until the promise resolves
$result = $promise->wait();
var_dump($result); // int(8)
} catch (Exception $e) {
printf("Something went wrong:\n\n%s\n", $e->getMessage());
}
We can parallelize mutliple threaded operations by using Amp
combinators:
<?php
try {
$dispatcher = new Amp\Thread\Dispatcher;
$a = $dispatcher->call('sleep', 1);
$b = $dispatcher->call('sleep', 1);
$c = $dispatcher->call('sleep', 1);
// Combine these three promises into a single promise that
// resolves when all of the individual operations complete
$comboPromise = Amp\all([$a, $b, $c]);
// Our three sleep() operations will complete in one second
// because they all run at the same time!
$comboPromise->wait();
} catch (Exception $e) {
printf("Something went wrong:\n\n%s\n", $e->getMessage());
}
Combinator return values are also easily accessible. Consider the following example where we list the individual results from our parallel calls:
<?php
function add($x, $y) {
return $x + $y;
}
try {
$dispatcher = new Amp\Thread\Dispatcher;
$a = $dispatcher->call('add', 1, 2);
$b = $dispatcher->call('add', 10, 32);
$c = $dispatcher->call('add', 5, 7);
// Combine these three promises into a single promise that
// resolves when all of the individual operations complete
$comboPromise = Amp\all([$a, $b, $c]);
// Wait for the three parallel operations to complete
list($a, $b, $c) = $comboPromise->wait();
var_dump($a, $b, $c);
/*
int(3)
int(42)
int(12)
*/
} catch (Exception $e) {
printf("Something went wrong:\n\n%s\n", $e->getMessage());
}