无法连接服务出现了 Uncaught Pulsar\Exception\IOException: socket is closed
Closed this issue · 9 comments
dalingng commented
使用了官方的示例代码不能用,自己又安装了swoole来运行也不能用
无法连接
PHP Fatal error: Uncaught Pulsar\Exception\IOException: socket is closed in /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php:153
Stack trace:
#0 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(209): Pulsar\IO\StreamIO->read(8)
#1 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(188): Pulsar\IO\StreamIO->handleRead()
#2 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(97): Pulsar\IO\StreamIO->wait()
#3 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Lookup/TcpLookupService.php(63): Pulsar\IO\StreamIO->handshake(Object(App\Pulsar\TuyaAuth))
#4 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(191): Pulsar\Lookup\TcpLookupService->__construct(Object(Pulsar\ConsumerOptions))
#5 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(122): Pulsar\Client->makeLookupService()
#6 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(111): Pulsar\Client->fetchPartitionTopicMetadata()
#7 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Consumer.php(62): Pulsar\Client->__construct('pulsar://mqe.tu...', Object(Pulsar\ConsumerOptions))
#8 /Users/dng/work2/SparrowRoom/app/Console/Commands/TuyaPulsar.php(58): Pulsar\Consumer->__construct('pulsar://mqe.tu...', Object(Pulsar\ConsumerOptions))
#9 /Users/dng/work2/SparrowRoom/app/Console/Commands/TuyaPulsar.php(101): App\Console\Commands\TuyaPulsar->runExc()
#10 [internal function]: App\Console\Commands\TuyaPulsar->App\Console\Commands\{closure}()
#11 {main}
thrown in /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php on line 153
Symfony\Component\ErrorHandler\Error\FatalError
Uncaught Pulsar\Exception\IOException: socket is closed in /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php:153
Stack trace:
#0 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(209): Pulsar\IO\StreamIO->read(8)
#1 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(188): Pulsar\IO\StreamIO->handleRead()
#2 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(97): Pulsar\IO\StreamIO->wait()
#3 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Lookup/TcpLookupService.php(63): Pulsar\IO\StreamIO->handshake(Object(App\Pulsar\TuyaAuth))
#4 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(191): Pulsar\Lookup\TcpLookupService->__construct(Object(Pulsar\ConsumerOptions))
#5 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(122): Pulsar\Client->makeLookupService()
#6 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(111): Pulsar\Client->fetchPartitionTopicMetadata()
#7 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Consumer.php(62): Pulsar\Client->__construct('pulsar://mqe.tu...', Object(Pulsar\ConsumerOptions))
#8 /Users/dng/work2/SparrowRoom/app/Console/Commands/TuyaPulsar.php(58): Pulsar\Consumer->__construct('pulsar://mqe.tu...', Object(Pulsar\ConsumerOptions))
#9 /Users/dng/work2/SparrowRoom/app/Console/Commands/TuyaPulsar.php(101): App\Console\Commands\TuyaPulsar->runExc()
#10 [internal function]: App\Console\Commands\TuyaPulsar->App\Console\Commands\{closure}()
#11 {main}
thrown
at vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php:153
149▕
150▕ // socket is closed
151▕ if (\feof($this->socket) || !\is_resource($this->socket) || $bytes === false) {
152▕ $this->close();
➜ 153▕ throw new IOException('socket is closed');
154▕ }
155▕
156▕ return $bytes;
157▕ }
Whoops\Exception\ErrorException
Uncaught Pulsar\Exception\IOException: socket is closed in /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php:153
Stack trace:
#0 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(209): Pulsar\IO\StreamIO->read(8)
#1 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(188): Pulsar\IO\StreamIO->handleRead()
#2 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(97): Pulsar\IO\StreamIO->wait()
#3 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Lookup/TcpLookupService.php(63): Pulsar\IO\StreamIO->handshake(Object(App\Pulsar\TuyaAuth))
#4 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(191): Pulsar\Lookup\TcpLookupService->__construct(Object(Pulsar\ConsumerOptions))
#5 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(122): Pulsar\Client->makeLookupService()
#6 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(111): Pulsar\Client->fetchPartitionTopicMetadata()
#7 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Consumer.php(62): Pulsar\Client->__construct('pulsar://mqe.tu...', Object(Pulsar\ConsumerOptions))
#8 /Users/dng/work2/SparrowRoom/app/Console/Commands/TuyaPulsar.php(58): Pulsar\Consumer->__construct('pulsar://mqe.tu...', Object(Pulsar\ConsumerOptions))
#9 /Users/dng/work2/SparrowRoom/app/Console/Commands/TuyaPulsar.php(101): App\Console\Commands\TuyaPulsar->runExc()
#10 [internal function]: App\Console\Commands\TuyaPulsar->App\Console\Commands\{closure}()
#11 {main}
thrown
at vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php:153
149▕
150▕ // socket is closed
151▕ if (\feof($this->socket) || !\is_resource($this->socket) || $bytes === false) {
152▕ $this->close();
➜ 153▕ throw new IOException('socket is closed');
154▕ }
155▕
156▕ return $bytes;
157▕ }
+1 vendor frames
2 [internal]:0
Whoops\Run::handleShutdown()
运行的代码
<?php
namespace App\Console\Commands;
use App\Pulsar\TuyaAuth;
use Illuminate\Console\Command;
use Pulsar\Consumer;
use Pulsar\ConsumerOptions;
use Pulsar\SubscriptionType;
use Swoole\Runtime;
use Swoole\Coroutine;
use function Swoole\Coroutine\run;
class TuyaPulsar extends Command
{
/**
* The name and signature of the console command.
*
* @var string
*/
protected $signature = 'tuya:pulsar';
/**
* The console command description.
*
* @var string
*/
protected $description = '涂鸦pulsar';
public function runExc(){
$accessID='xxxxx';
$accessKey='xxxxx';
$options = new ConsumerOptions();
// If permission authentication is available
// Only JWT authentication is currently supported
$auth = new TuyaAuth($accessID,$accessKey);
$options->setAuthentication($auth);
$options->setConnectTimeout(3);
$options->setTopic(sprintf("persistent://%s/out/event", $accessID));
$options->setSubscription(sprintf('%s-sub',$accessID));
$options->setSubscriptionType(SubscriptionType::Failover);
// Initial position at which to set cursor when subscribing to a topic at first time.
// default use InitialPosition::Latest()
// $options->setSubscriptionInitialPosition(InitialPosition::Earliest());
// Configure how many seconds Nack's messages are redelivered, the default is 1 minute
// $options->setNackRedeliveryDelay(20);
// Maximum number of reconnections
$options->setReconnectPolicy(true, 3, 100);
$pulsarAddrCN = "pulsar+ssl://mqe.tuyacn.com:7285";
$consumer = new Consumer($pulsarAddrCN, $options);
// or use pulsar proxy address
//$consumer = new Consumer('http://localhost:8080', $options);
$consumer->connect();
while (true) {
$message = $consumer->receive();
// get properties
var_export($message->getProperties());
echo sprintf(
'Got message 【%s】messageID[%s] topic[%s] publishTime[%s] redeliveryCount[%d]',
$message->getPayload(),
$message->getMessageId(),
$message->getTopic(),
$message->getPublishTime(),
$message->getRedeliveryCount()
) . "\n";
// ...
// Remember to confirm that the message is complete after processing
$consumer->ack($message);
// When processing fails, you can also execute the Nack
// The message will be re-delivered after the specified time
// $consumer->nack($message);
}
$consumer->close();
}
/**
* Execute the console command.
*
* @return int
*/
public function handle()
{
Runtime::enableCoroutine(true, SWOOLE_HOOK_ALL);
run(function () {
$this->runExc();
});
return Command::SUCCESS;
}
}
TuyaAuth.php
<?php
namespace App\Pulsar;
use Pulsar\Authentication\Authentication;
class TuyaAuth implements Authentication{
/**
* @var string
*/
protected $accessID;
/**
* @var string
*/
protected $accessKey;
/**
* @param string $token
*/
public function __construct(string $accessID,string $accessKey)
{
$this->accessID = $accessID;
$this->accessKey = $accessKey;
}
/**
* @inheritDoc
*/
public function authMethodName(): string
{
return 'auth1';
}
static protected function md5Hex(string $s):string{
return md5($s);
}
/**
* @inheritDoc
*/
public function authData()
{
$key = md5($this->accessID . md5($this->accessKey));
$key = substr($key,8,24-8);
$data = [
'username' => $this->accessID,
'password' => $key,
];
return json_encode($data);
}
}
ikilobyte commented
目前还不支持 pulsar+ssl
协议
karthikeyan-manivasagam commented
@ikilobyte I too get Socket is closed
error when I use pulsar+ssl
then I changed to https://mystreamnativehosteddomain:8080 then I got the below error
is this fixed ? , I too face same error, and when I connect using https, I get this error
Pulsar\Exception\ConnectException: Connection timed out in file /var/www/html/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php on line 55
ikilobyte commented
@karthikeyan-manivasagam The SSL protocol is not currently supported
karthikeyan-manivasagam commented
@ikilobyte , Thank you
dalingng commented
好的谢谢,我已经转成用go来接收转成amqp。所以没看消息不好意思
karthikeyan-manivasagam commented
is this issue fixed ?
ikilobyte commented
@karthikeyan-manivasagam It hasn't been fixed yet.
ikilobyte commented
@karthikeyan-manivasagam @dalingng v1.3.0
TLS Support