Stream continues to be open even though nothing can be fetched
anotherdevs opened this issue · 2 comments
Hi
I use this code to fetch jobs:
// Autoload classes
require_once realpath(__DIR__) . '/vendor/autoload.php';
$client = new Amp\Beanstalk\BeanstalkClient('tcp://127.0.0.1:11300');
$client->watch('testtube');
$client->ignore('default');
Amp\Loop::run(function() use ($client)
{
Amp\Loop::repeat($msInterval = 1, function() use ($client)
{
$client->reserve(0)->onResolve(function($error, $result) use ($client)
{
$id = (int) $result['0'];
if ($id <= 0)
return;
echo "Handle job: " . $id . "\n";
$client->delete($id)->onResolve(function($error, $result) use ($client, $id)
{
echo "Delete job: " . $id . "\n";
});
});
});
});
I use stunnel4 (guide: https://www.digitalocean.com/community/tutorials/how-to-encrypt-traffic-to-redis-with-stunnel-on-ubuntu-16-04) to create a tunnel between machine1 (running Beanstalk) and machine2 (consumer).
When using stunnel4 it will keep the port: 11300 on 127.0.0.1 open all the time, even though the remote port has closed due to either restart or the process stops.
However amphp/beanstalk doesn't timeout the connection as it sees the connection as still alive because the local port doesn't close - so the stream doesn't close.
If I add $stats = yield $client->getSystemStats();
right after repeat, it will close the connection and fail with: PHP Fatal error: Uncaught Amp\ByteStream\StreamException: The stream was closed by the peer
.
Is it however not that great to continuously send a command to request data, because it will waste ressources, and may slow down reserving jobs.
TLDR: amphp/beanstalk doesn't recognize stream closed when reserving.
You aren't examining $error
in the callbacks in your code. I highly recommend using coroutines instead of callbacks, because errors will automatically be thrown instead of having to check the $error
parameter.
And you probably don't want to call $client->reserve(0)
every 1 ms.
I'll close this issue, as there hasn't been any response. If you have further questions, either respond to this issue or report a new one.