nrk/predis-async

First published message in channel after reconnect is lost

Opened this issue · 0 comments

Below exampel script. Run it and than publish messages to "queue:0:cmd":
publish queue:0:cmd test
publish queue:0:cmd quit
publish queue:0:cmd test
publish queue:0:cmd quit
You will see response from monitor:
$php ./test
Connected to Redis, now listening for incoming messages cmd...
Received message test from channel queue:0:cmd [type: message].
Received message quit from channel queue:0:cmd [type: message].
Connected to Redis, now listening for incoming messages cmd...
Received message quit from channel queue:0:cmd [type: message].

As you can see second "test" message is lost. In same time "redis-cli" points that all messages were delivered:

127.0.0.1:6379> publish queue:0:cmd test
(integer) 1
127.0.0.1:6379> publish queue:0:cmd quit
(integer) 1
127.0.0.1:6379> publish queue:0:cmd test
(integer) 1
127.0.0.1:6379> publish queue:0:cmd quit
(integer) 1

"libvent" lib is used for event loop.

require __DIR__ . '/vendor/autoload.php';


$loop = \React\EventLoop\Factory::create();

$client = new Predis\Async\Client('tcp://127.0.0.1:6379', $loop);

$client->connect(function ($client) {
    echo "Connected to Redis, now listening for incoming messages cmd...\n";

    $client->pubSubLoop('queue:0:cmd', function ($event, $pubsub) {
        $message = "Received message `%s` from channel `%s` [type: %s].\n";

        $feedback = sprintf($message,
            $event->payload,
            $event->channel,
            $event->kind
        );

        echo $feedback;

        if ($event->payload === 'quit') {
            $pubsub->quit();
        }
    });
});


$loop->run();

$client->connect(function ($client) {
    echo "Connected to Redis, now listening for incoming messages cmd...\n";

    $client->pubSubLoop('queue:0:cmd', function ($event, $pubsub) {
        $message = "Received message `%s` from channel `%s` [type: %s].\n";

        $feedback = sprintf($message,
            $event->payload,
            $event->channel,
            $event->kind
        );

        echo $feedback;

        if ($event->payload === 'quit') {
            $pubsub->quit();
        }
    });
});


$loop->run();