misterion/ko-process

Basic publish - consume from child process

Closed this issue ยท 25 comments

First of all @misterion, thank you for this awesome library, I really like it.

I am playing with videlalvaro/php-amqplib.
What I wanted to do is have parallel publisher and consumer running in child processes:

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('q1');
$processManager = new ProcessManager();

$processManager->fork(function (Process $process) use ($channel) {
    echo 'Publishing from '.$process->getPid().PHP_EOL;

    $channel->basic_publish(new AMQPMessage('Hello from '.$process->getPid()), null, 'q1');
    $channel->close();
});

$processManager->fork(function (Process $process) use ($channel) {
    echo 'Consuming from '.$process->getPid().PHP_EOL;

    $channel->basic_consume('q1', '', false, false, false, false, function ($message) {
        echo 'Consuming message: '.$message->body.PHP_EOL;
    });

    while (count($channel->callbacks)) {
        $channel->wait();
    }
});

$processManager->wait();

Output:

Publishing from 9830
Consuming from 9831

Messages are published, but never consumed. Do you have any idea what can be the reason?

Full example is available on https://github.com/umpirsky/Extraload/blob/feature/queue-playground/test.php, can be easily checkout and run.

@umpirsky thanks for good words. I think the problem is forking in php. I mean the forking process in php has some limitations and restrictions. One of them - resource sharing between forked process but in your code problem is $channel->close(); in first forked process. Then you open connection with $connection->channel(); you create socket in master process. Then you fork process you share this socket and its state between 3 separete process - master, child 1 and child 2. Then you send you message in first forked process you close channel but actualy change state of socket in all processes. So the child 2 now in undefined behaviour state. The right strategy in this case - has one connection per consumer process opened in it context:

$processManager = new ProcessManager();

$processManager->fork(function (Process $process) use ($channel) {
    echo 'Publishing from '.$process->getPid().PHP_EOL;

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    $channel->queue_declare('q1');
    $channel->basic_publish(new AMQPMessage('Hello from '.$process->getPid()), null, 'q1');
    $channel->close();
});

$processManager->fork(function (Process $process) use ($channel) {
    echo 'Consuming from '.$process->getPid().PHP_EOL;

    $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
    $channel = $connection->channel();
    $channel->queue_declare('q1');
    $channel->basic_consume('q1', '', false, false, false, false, function ($message) {
        echo 'Consuming message: '.$message->body.PHP_EOL;
    });

    while (count($channel->callbacks)) {
        $channel->wait();
    }
});

$processManager->wait();

More one thing about this - sending message with AMQP is very lightweight operation. The most costly part of it - open connection to AMQP server. So it looks like overhead to fork process just for message pushing. Also it may be interestion for you look at https://github.com/misterion/ko-worker project (based on ko-process). Ko-worker project created to simplify work with workers based on AMQP.

@misterion Thanks, that worked like a charm! ๐Ÿ‘

That means that connection must always be created inside forked process, which kinda makes it imposible to inject connection or channel like I was planning to.

ko-worker looks interesting too, thanks for sharig.

Keep doing great work! ๐Ÿ‘

Yes, if using fork in php you should open resources in fork context. In your case you can inject parameters to create connection in forked process. Or use some kind of ioc container in which connection is not actualy esteblished.

@misterion Exactly my idea.

I am playing with Ko\AmqpBroker now. Because he can do exactly that, provide fresh producer/consumer based on config.

I am trying again to run producer and consumer in separate child processes:

$broker = new Ko\AmqpBroker($config);
$processManager = new ProcessManager();

$processManager->fork(function(Process $process) use ($broker) {
    $producer = $broker->getProducer('extractor');
    $message = 'Hello world ' . time();
    echo 'Sending message `' . $message . '`' . PHP_EOL ;
    $producer->publish($message);

});

$processManager->fork(function(Process $process) use ($broker) {
    $consumer = $broker->getConsumer('extractor');
    $message = 'Hello world ' . time();
    echo 'Receiving message `' . $message . '`' . PHP_EOL ;
    $consumer->consume(function (AMQPEnvelope $envelope) {
    echo 'Receive `' . $envelope->getBody() . PHP_EOL;

    return true;
    }, AMQP_AUTOACK);
});

$processManager->wait();

I get:

Sending message `Hello world 1454345755`
Receiving message `Hello world 1454345755`

Looks like it is never consumed. The strange thing is that the queue is empty.

Am I missing something again?

Full example on:
https://github.com/umpirsky/Extraload/blob/feature/queue/examples/test.php

Could you post your config?

Please change your durability to true (https://www.rabbitmq.com/tutorials/tutorial-two-php.html) or start the consumer first. In RabbitMQ is queue is not durable all messages would be lost if here is no any consumers on it.

@misterion Thanks. That is true in deed, nice catch.

But, after changing both umpirsky/Extraload@b9fecca still same result. Must be something else as well.

Looks like message never reach the queue, I see consumer is active, but looks like there are no messages to consume.

I narrow down my script to:

$broker->getProducer('extractor')->publish('Hello');

But looks like this message never gets to queue.

selection_088

@misterion Any opinion on this?

Sorry @umpirsky, i looze your previous message so just see it. Did you try with durable queue?

In your sample all looks fine. Did you try to execute them without forking? P.s. i can try to play with it myself in monday - now have no avcess to pc with linux on board.

@misterion I did before, it didn't work.

Now I tried it again, after:

$broker->getProducer('extractor')->publish('Hello');

No messages in queue, which is strange.

And consume ends up with Segmentation fault (core dumped).

If you can try it next week that would be cool.

I know I can always achieve this with ko-process and videlalvaro/php-amqplib, but I wanted to give ko-worker a try, since it does all this for me.

Thanks.

I look at your code - the problem is consumer routing-keys defined as *. You mean this as any message from extractor_exchange but actualy (you are using direct type of excange) this mean only full match routing keys are supported with no masks (as in topic). Look at https://www.rabbitmq.com/getstarted.html for more details. To fast fix your sample just change routing-keys: '*' to routing-keys: '' in config.yaml. Or change exchange_options type to topic and send non empty routing key with publish method.

Thanks @misterion! ๐Ÿ‘

umpirsky/Extraload@441b5a0 fixed it in deed.

I also tried to change exchange_options type to topic and send non empty routing key with publish method, but I got AMQPExchangeException: 'Server channel error: 406, message: PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'extractor_exchange' in vhost '/': received 'topic' but current is 'direct''.

Probably something else need to be changed as well.

However, thank you very much for you help. Really appreciate it.

In RabbitMQ you can`t redeclare exchange type without recreating it. So if you change type from 'direct' to 'topic' you should delete exchange first.

@misterion Oh, I see. Thanks.

@misterion I just came across very strange use case, which makes ko-worker/ko-process combo do not work for me.

My test script https://github.com/umpirsky/Extraload/blob/feature/queue/examples/test.php works and output is:

Receiving message `Hello world 1455114747`
Sending message `Hello world 1455114747`
Receive `Hello world 1455114747

So far so good.

But adding any use statements, like:

diff --git a/examples/test.php b/examples/test.php
index b2a9be5..42eceb5 100644
--- a/examples/test.php
+++ b/examples/test.php
@@ -3,6 +3,7 @@

 require __DIR__.'/../vendor/autoload.php';

+use Symfony\Component\Yaml\Yaml;
 use Ko\ProcessManager;
 use Ko\Process;

makes it stop consuming messages and the output is:

Receiving message `Hello world 1455114886`
Sending message `Hello world 1455114886`

I spent a lot of time narrowing it down to this, but why it happens is a total mistery for me.

If you have any clue, please hint me. It's very strange.

Thanks for your assistance so far.

Looks very strange. I`l try to run it myself tomorrow and write the results. The last used code in your feature/queue branch? Could you also provide you php and AMQP extension versions?

@misterion Yes, the linked one is good, by applying diff from my previous comment it stops working.

$ php -v
PHP 5.6.11-1ubuntu3.1 (cli) 
Copyright (c) 1997-2015 The PHP Group
Zend Engine v2.6.0, Copyright (c) 1998-2015 Zend Technologies
    with Zend OPcache v7.0.6-dev, Copyright (c) 1999-2015, by Zend Technologies
    with Xdebug v2.3.3, Copyright (c) 2002-2015, by Derick Rethans

AMQP version: 1.6.1

Thanks!

Hi @umpirsky. Just test your code on php 5.6.10 (my dev stand), 5.6.11 (local dev stand at my work) both with AMQP 1.6.1 and 1.7.0. Have same output (with diff time) for all 4 cases:

Receiving message `Hello world 1455201401`
Sending message `Hello world 1455201401`
Receive `Hello world 1455201401

Here is full source i test with:

#!/usr/bin/env php
<?php

require __DIR__.'/../vendor/autoload.php';

use Symfony\Component\Yaml\Yaml;
use Ko\ProcessManager;
use Ko\Process;

$config = Yaml::parse(file_get_contents(__DIR__.'/config.yml'));
$broker = new Ko\AmqpBroker($config);
$processManager = new ProcessManager();

$processManager->fork(function(Process $process) use ($broker) {
    $consumer = $broker->getConsumer('extractor');
    $message = 'Hello world ' . time();
    echo 'Receiving message `' . $message . '`' . PHP_EOL ;

    $consumer->consume(function (AMQPEnvelope $envelope) {
        echo 'Receive `' . $envelope->getBody() . PHP_EOL;
        return true;
    }, AMQP_AUTOACK);
});
$processManager->fork(function(Process $process) use ($broker) {
    $producer = $broker->getProducer('extractor');
    $message = 'Hello world ' . time();
    echo 'Sending message `' . $message . '`' . PHP_EOL ;
    $producer->publish($message);
});

$processManager->wait();

And my config file:

connections:
  default:
      host: 'localhost'
      port: 5672
      login: 'guest'
      password: 'guest'
producers:
  extractor:
    connection: default
    exchange_options: {name: 'extractor_exchange', type: direct, durable: 1, passive: 0}
consumers:
  extractor:
    connection: default
    queue_options:
      name: 'extractor_queue'
      durable: 1
      autodelete: 0
      binding: {name: 'extractor_exchange', routing-keys: ''}

I also asked my devops friend about this. He suggest to try to reboot RabbitMQ and your dev stand to "purity of the experiment". Could you try to reboot you dev server?

@misterion Thanks for trying it out.

And seems like your devops friend was right (say hello from me).

So, I just run it again, and if I wait for some time (which I usually don't do because I am not patient), I got:

ubuntu_105

Then I noticed durable: 1 in your config and I did:

diff --git a/examples/config.yml b/examples/config.yml
index 33ee89b..45ed4b5 100644
--- a/examples/config.yml
+++ b/examples/config.yml
@@ -7,7 +7,7 @@ connections:
 producers:
   extractor:
     connection: default
-    exchange_options: {name: 'extractor_exchange', type: direct, durable: 0, passive: 0}
+    exchange_options: {name: 'extractor_exchange', type: direct, durable: 1, passive: 0}

Then some errors like durable is false, expected true...

Then I tried to restart RabbitMQ, got some erlang segmentation faults. Then I reboot my machine.

And guess what? It finally works.

So, I assume that there are some issues with my current setup, but if experiment is pure, it should work.

Thanks again for your help, you will get credits in https://github.com/umpirsky/Extraload readme. ;)

Thanks for using my labrary ;) It`s realy nice to know that something else need it :)

@misterion Oh I need it. :)