amphp/parallel

Queue-based pool

hitriyvalenok opened this issue · 8 comments

Hello! I'm using a pool of tasks. Any time I may need to stop them all. For it, I'm using ContextWorkerPool. My issue is that submit runs the worker and waits for task completion if the $limit is over. Thus, I cannot control execution since I should wait until all tasks over limits are completed. I think it would be a good solution if there was a kinda the queue-based pool where I could add all my tasks, run execution, and wait asynchronously. How I can solve my task? I am not sure I'm doing it with the most graceful method. Thank you.

What do you mean with wait asynchronously?

I mean I don't want that submit force me to wait for the task completion. I'd prefer to use wait for it.

Which version of parallel do you use?

Right now I'm trying to stop the task by throwing an exception from the run method. I want to throw an exception from submit - thanks to it I can catch it without any waits. But I found that only die throws something (ChannelException) from submit; any other exceptions can be caught only in the wait methods.

Ideally, I would start all tasks asynchronously, then call ContextWorkerPool->shutdown(), but submit is spoiling the whole raspberries. Also, it's ok for me to throw something like CancelledException and catch it in submit.

If you want to limit the number of tasks being submitted to the pool, consider using some of our abstractions for queues, mutexes/semaphores, or concurrency. You can find these in amphp/sync and amphp/pipeline.

For example, you may be able to use a Pipeline to limit the concurrency.

Pipeline::fromIterable($taskIterable) // $taskIterable is an iterable producing Task objects.
    ->concurrent(10) // Limit to 10 simultaneous tasks
    ->map(fn (Task $task) => workerPool()->submit($task))
    ->map(fn (Execution $execution) => $execution->await())
    ->forEach(function ($result): void {
        // Do something with task result in $result.
    });

Thanks! But it for some reason starts x2 threads. For example, in this code tasks started two each, although the concurrent value is set to 1. With concurrent=2 4 simultaneous tasks will be started.

use Amp\Parallel\Worker\Execution;
use Amp\Parallel\Worker\Task;
use Amp\Pipeline\Pipeline;
use Bro\SampleTask;
use function Amp\Parallel\Worker\createWorker;

class SampleTask implements Task
{
    public function run( Channel $channel, Cancellation $cancellation ): mixed
    {
        sleep( 2 );
        return true;
    }
}

$tasksToStart = 4;
$concurrent = 1;
$task = new SampleTask();

$tasks = [];
for( $taskNum = 0; $taskNum < $tasksToStart; ++$taskNum ) {
    $tasks[] = $task;
}

Pipeline::fromIterable( $tasks )
        ->concurrent( $concurrent )
        ->map( fn( Task $task ) => createWorker()->submit( $task ) ) // no pools
        ->map( fn( Execution $execution ) => $execution->await() )
        ->forEach( function( $result ): void {
            $currentTime = DateTime::createFromFormat( 'U.u', sprintf( '%.6F', microtime( true ) ) )->format(
                'H-i-s.u'
            );
            echo "$currentTime\n";
        } )
;

Output:

19-34-27.059025
19-34-27.121341
19-34-29.123185
19-34-29.181591

Multiple tasks are being executed because you're creating the tasks before awaiting the tasks to be completed. Awaiting is what will apply back-pressure to the pipeline (or rather the underlying concurrent iterator) and cause it to wait to consume more values from the source iterator.

Changing the example to create an then await the task in the same function will only executed the expected number of tasks at once.

Pipeline::fromIterable( $tasks )
        ->concurrent( $concurrent )
        ->map( fn( Task $task ) => createWorker()->submit( $task )->await() ) // submit and await task to complete
        ->forEach( function( $result ): void {
            $currentTime = DateTime::createFromFormat( 'U.u', sprintf( '%.6F', microtime( true ) ) )->format(
                'H-i-s.u'
            );
            echo "$currentTime\n";
        } )
;