amphp/sync

Issue with serializing callables for parallel processing.

UnnikrishnanBhargavakurup opened this issue · 15 comments

I have a bunch of background tasks in a Symfony application and I'm trying to process it parallel using this library. My background tasks are Symfony services with an execute function.

The following code contains a sample implementation of my task engine.

        $noOfTasks = 100;
        $engine = $this;
        wait(each(
            Iterator\fromIterable($this->load_tasks_from_db($noOfTasks)),
            new LocalSemaphore(10),
            static function ($task) use($engine) {
                $service = $engine->container->get($task->getProcessor());
                yield Worker\enqueueCallable([$service, 'execute'], $task);
            }
        ));

Here when the $service contains a lot of dependencies Amp library throwing the following error

app.CRITICAL: The given data could not be serialized: Serialization of 'SimpleXMLElement' is not allowed

It's is coming from NativeSerializer, even if I try the following code also I'm getting the above error.

$service = $this->container->get($task->getProcessor());
try {
     \serialize($service);
} catch (\Throwable $exception) {
    dump(sprintf('The given data could not be serialized: %s', $exception->getMessage()));
}

This is not a problem with Amp library but my question is how we can execute callables parallelly if it contains a lot of dependencies?

Instead of passing and thus serializing your service, you can just send the data to process and create the service only inside the closure, so it'll be created in the child process / thread instead of being serialized.

Hi @kelunik - I need to pass the callable with all the dependencies right? here my callable is a service from the container. I'm still not sure what you meant by 'create the service only inside the closure'. Could you please provide a sample code?

Thanks for the help!

Instead of getting the service from the container and using it as callable, create a new container / service instance inside the callable you pass to enqueueCallable.

create a new container / service instance inside the callable

In that case, I need to create a callable with a DI container inside it. That is also failing because the DI container inside the callable needs to be sterilized. Please see the below code

class MyCallable
{
    /** @var Container */
    private $container;

    public function __construct(ContainerInterface $container) {
        $this->container = $container;
    }

    public function process(Task $task) {

        $taskService = $this->container->get($task->getProcess());
        $taskService->execute($task);
    }
}

yield Worker\enqueueCallable([$myCallable, 'process'], $task);

app.CRITICAL: The given data could not be serialized: Serialization of 'Closure' is not allowed

Don't serialize DI container, create a new one in the Task. Like this:

		$_ENV = $this->env;

		$kernel = new Kernel($this->env['APP_ENV'], (bool) $this->env['APP_DEBUG']);
		$kernel->boot();

		$container = $kernel->getContainer();

and add this to task constructor:

		$this->env = $_ENV;

Hi @enumag, Thanks for the help it's working perfectly!

Hi @enumag, @kelunik, just one more question instead of bootstrapping the application for each task, can we share the DI container across processes. I getting the following error when I bootstrap the application for each process.

ERROR: fork() failed: Cannot allocate memory (12)

Starting the process failed

Instead of using a callable, you can implement the Task interface and store the kernel / container in the environment received as parameter.

@kelunik - could you please share a sample code or provide an example code link?

Personally I don't recommend using CallableTask. In my personal opinion it's a bit of an anti-pattern that can easily cause problems. Programmers tend to pass a callable that uses some services and those should not be serialized. Personally I have an abstract BaseTask where I implement the creation of new Container as posted above and some logic around that (methods to get a service or a parameter). Then I have custom Task classes which take some arguments the task needs (typically scalars or easily serializable value objects). Something like this:

abstract class BaseTask implements Task
{
	/**
	 * @var array<string, string>
	 */
	private array $env;

	public function __construct()
	{
		$this->env = $_ENV;
	}

	protected function createContainer(): ContainerInterface
	{
		if ($this->env['APP_DEBUG'] === '1') {
			Debug::enable();
		}

		$_ENV = $this->env;

		$kernel = new Kernel($this->env['APP_ENV'], (bool) $this->env['APP_DEBUG']);
		$kernel->boot();

		return $kernel->getContainer();
	}
}

final class ProcessFileTask extends BaseTask
{
	private string $file;

	public function __construct(string $file)
	{
		// WARNING: IT'S VERY IMPORTANT TO CALL parent::__construct() HERE, OTHERWISE YOU WON'T HAVE THE ENV VARIABLES
		parent::__construct();

		$this->file = $file;
	}

	public function run(Environment $environment)
	{
		// here I try to grab DI container from $environment
        // if it doesn't exist there, I create a new one and store it there
        // the reason is that the Environment still lives in the Worker so if the Worker later gets another Task to process then the container will be there already

		if (! $environment->exists('container')) {
			$environment->set('container', $this->createContainer());
		}

		$container = $environment->get('container');       

        // then I pull a service from DI container and use it
	}
}

to answer your question:

can we share the DI container across processes

No. Each process has it's own memory. There is no way to access a container from another process. "Sharing" it would basically mean serializing it to get a copy of it in the new process but please avoid this. Container is not meant to be serialized.

That said, you typically shouldn't need too many services from the container to process the Task so memory shouldn't be a problem. I actually suspect that your memory problems are caused by you accidentally trying to serialize the container.

Hi @enumag, my current implementation is the following.

$env = [
    'APP_ENV' => $this->container->get('kernel')->getEnvironment(),
    'APP_DEBUG' => $this->container->get('kernel')->isDebug(),
];

$taskProcessor = new TaskProcessor($env);

$engine = $this;
$jobs = 0;
$poolSize = 10;

wait(each(
    Iterator\fromIterable($this->fetchScheduled($amountOfTasks)),
    new LocalSemaphore($poolSize),
    static function (Task $task) use ($engine, &$jobs, $taskProcessor) {
        $jobs++;
        if (State::FETCHED !== $task->getState()) {
            return;
        }

        try {
            $task->setState(State::UNDERWAYS);
            $engine->store($task);
            yield Worker\enqueueCallable([
                $taskProcessor,
                'process'
            ], $task);
        
        } catch (TaskFailureError | Exception $exception) {
            $task->setError($exception);
            $task->setState(State::HASERRORS);
        }
        $engine->store($task);
    }
));
<?PHP

namespace TaskBundle\Task;

use AppKernel;

class TaskProcessor
{
    /* @var array */
    private $env;

    public function __construct(array $env)
    {
        $this->env = $env;
    }

    /**
     * @param Task $task
     */
    public function process(Task $task): void
    {

        $kernel = new AppKernel($this->env['APP_ENV'], $this->env['APP_DEBUG']);
        $kernel->boot();
        $container = $kernel->getContainer();

        $taskService = $container->get($task->getServiceName());
        $taskService->execute($task);
    }
}

Please edit your post to highlight PHP syntax for easier reading.

You can do that by adding php right after the 3 backslashes that open the code block.

Drop TaskProcessor, implement your own Task class as I suggested above and use enqueue($task) instead of enqueueCallable.