amphp/parallel

Can't get CancelledException ends the task properly without killing the child process

xadorfr opened this issue · 1 comments

Hello,

We would like to be able to get a task killed if a timeout is reached. As such, we thought that the correct approach was to use a TimeoutCancellation.

According to the documentation :

A Cancellation provided to Worker::submit() may be used to request cancellation of the task in the worker. When cancellation is requested in the parent, the Cancellation provided to Task::run() is cancelled. The task may choose to ignore this cancellation request or act accordingly and throw a CancelledException from Task::run(). If the cancellation request is ignored, the task may continue and return a value which will be returned to the parent as though cancellation had not been requested.

In view of these explanation, we have tried to make the task subscribe to the Cancellation of the Task and just throw it, thinking that the Context will be ok with that, and just wait for the next task to run. It must not be the correct approach as the exception seems to kill the child process and make the whole thing crash.

We do not find an example in the repository to fit our need.

Does someone can help us ?

Thank you

Here is the simple code to illustrate what we have tried :

  • main.php :
<?php

use Amp\Future;
use Amp\Parallel\Worker;
use Amp\TimeoutCancellation;

use PhpWorker\Job;

$workerPool = Amp\Parallel\Worker\workerPool();

$execution = Worker\submit(new Job(), new TimeoutCancellation(1));
$future = $execution->getFuture();

$future->finally(function () {
 echo "JOB_END" . PHP_EOL;
});

Future\await([$future]);

echo "MAIN_END"  . PHP_EOL;
  • Job.php :
<?php

namespace PhpWorker;

use Amp\Cancellation;
use Amp\Parallel\Worker\Task;
use Amp\Sync\Channel;
use function Amp\delay;

class Job implements Task {
 public function run(Channel $channel, Cancellation $cancellation): string {
  $cancellation->subscribe(fn($e) => throw $e);
  delay(2); // fake work, longer than the timeout
  return true;
 }
}

A closure provided to a Cancellation should not throw. These functions are executed within the event loop, not within the context where they are defined. The Cancellation documentation doesn't make this clear, so I apologize for the confusion.

The cancellation subscriber should instead trigger some action that would then stop the operation and potentially throw from the original context, such as failing a deferred.

$deferred = new DeferredFuture();
$cancellation->subscribe(fn ($e) => $deferred->error($e));
return $deferred->await();

Many APIs can accept an optional Cancellation, such as HttpClient::request() in amphp/http-client, eliminating the need to set up a subscriber yourself. delay() is another such function. Your example could be simplified to:

class Job implements Task {
 public function run(Channel $channel, Cancellation $cancellation): bool {
  delay(2, cancellation: $cancellation); // fake work, longer than the timeout
  return true;
 }
}