ikilobyte/pulsar-client-php

无法连接服务出现了 Uncaught Pulsar\Exception\IOException: socket is closed

Closed this issue · 9 comments

使用了官方的示例代码不能用,自己又安装了swoole来运行也不能用
无法连接

PHP Fatal error:  Uncaught Pulsar\Exception\IOException: socket is closed in /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php:153
Stack trace:
#0 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(209): Pulsar\IO\StreamIO->read(8)
#1 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(188): Pulsar\IO\StreamIO->handleRead()
#2 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(97): Pulsar\IO\StreamIO->wait()
#3 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Lookup/TcpLookupService.php(63): Pulsar\IO\StreamIO->handshake(Object(App\Pulsar\TuyaAuth))
#4 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(191): Pulsar\Lookup\TcpLookupService->__construct(Object(Pulsar\ConsumerOptions))
#5 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(122): Pulsar\Client->makeLookupService()
#6 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(111): Pulsar\Client->fetchPartitionTopicMetadata()
#7 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Consumer.php(62): Pulsar\Client->__construct('pulsar://mqe.tu...', Object(Pulsar\ConsumerOptions))
#8 /Users/dng/work2/SparrowRoom/app/Console/Commands/TuyaPulsar.php(58): Pulsar\Consumer->__construct('pulsar://mqe.tu...', Object(Pulsar\ConsumerOptions))
#9 /Users/dng/work2/SparrowRoom/app/Console/Commands/TuyaPulsar.php(101): App\Console\Commands\TuyaPulsar->runExc()
#10 [internal function]: App\Console\Commands\TuyaPulsar->App\Console\Commands\{closure}()
#11 {main}
  thrown in /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php on line 153

   Symfony\Component\ErrorHandler\Error\FatalError

  Uncaught Pulsar\Exception\IOException: socket is closed in /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php:153
Stack trace:
#0 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(209): Pulsar\IO\StreamIO->read(8)
#1 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(188): Pulsar\IO\StreamIO->handleRead()
#2 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(97): Pulsar\IO\StreamIO->wait()
#3 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Lookup/TcpLookupService.php(63): Pulsar\IO\StreamIO->handshake(Object(App\Pulsar\TuyaAuth))
#4 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(191): Pulsar\Lookup\TcpLookupService->__construct(Object(Pulsar\ConsumerOptions))
#5 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(122): Pulsar\Client->makeLookupService()
#6 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(111): Pulsar\Client->fetchPartitionTopicMetadata()
#7 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Consumer.php(62): Pulsar\Client->__construct('pulsar://mqe.tu...', Object(Pulsar\ConsumerOptions))
#8 /Users/dng/work2/SparrowRoom/app/Console/Commands/TuyaPulsar.php(58): Pulsar\Consumer->__construct('pulsar://mqe.tu...', Object(Pulsar\ConsumerOptions))
#9 /Users/dng/work2/SparrowRoom/app/Console/Commands/TuyaPulsar.php(101): App\Console\Commands\TuyaPulsar->runExc()
#10 [internal function]: App\Console\Commands\TuyaPulsar->App\Console\Commands\{closure}()
#11 {main}
  thrown

  at vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php:153
    149▕
    150▕         // socket is closed
    151▕         if (\feof($this->socket) || !\is_resource($this->socket) || $bytes === false) {
    152▕             $this->close();
  ➜ 153▕             throw new IOException('socket is closed');
    154▕         }
    155▕
    156▕         return $bytes;
    157▕     }


   Whoops\Exception\ErrorException

  Uncaught Pulsar\Exception\IOException: socket is closed in /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php:153
Stack trace:
#0 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(209): Pulsar\IO\StreamIO->read(8)
#1 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(188): Pulsar\IO\StreamIO->handleRead()
#2 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php(97): Pulsar\IO\StreamIO->wait()
#3 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Lookup/TcpLookupService.php(63): Pulsar\IO\StreamIO->handshake(Object(App\Pulsar\TuyaAuth))
#4 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(191): Pulsar\Lookup\TcpLookupService->__construct(Object(Pulsar\ConsumerOptions))
#5 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(122): Pulsar\Client->makeLookupService()
#6 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Client.php(111): Pulsar\Client->fetchPartitionTopicMetadata()
#7 /Users/dng/work2/SparrowRoom/vendor/ikilobyte/pulsar-client-php/src/Consumer.php(62): Pulsar\Client->__construct('pulsar://mqe.tu...', Object(Pulsar\ConsumerOptions))
#8 /Users/dng/work2/SparrowRoom/app/Console/Commands/TuyaPulsar.php(58): Pulsar\Consumer->__construct('pulsar://mqe.tu...', Object(Pulsar\ConsumerOptions))
#9 /Users/dng/work2/SparrowRoom/app/Console/Commands/TuyaPulsar.php(101): App\Console\Commands\TuyaPulsar->runExc()
#10 [internal function]: App\Console\Commands\TuyaPulsar->App\Console\Commands\{closure}()
#11 {main}
  thrown

  at vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php:153
    149▕
    150▕         // socket is closed
    151▕         if (\feof($this->socket) || !\is_resource($this->socket) || $bytes === false) {
    152▕             $this->close();
  ➜ 153▕             throw new IOException('socket is closed');
    154▕         }
    155▕
    156▕         return $bytes;
    157▕     }

      +1 vendor frames
  2   [internal]:0
      Whoops\Run::handleShutdown()

运行的代码

<?php

namespace App\Console\Commands;

use App\Pulsar\TuyaAuth;
use Illuminate\Console\Command;
use Pulsar\Consumer;
use Pulsar\ConsumerOptions;
use Pulsar\SubscriptionType;
use Swoole\Runtime;
use Swoole\Coroutine;
use function Swoole\Coroutine\run;

class TuyaPulsar extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'tuya:pulsar';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = '涂鸦pulsar';

    public function runExc(){
        $accessID='xxxxx';
        $accessKey='xxxxx';

        $options = new ConsumerOptions();

        // If permission authentication is available
        // Only JWT authentication is currently supported
        $auth = new TuyaAuth($accessID,$accessKey);
        $options->setAuthentication($auth);

        $options->setConnectTimeout(3);
        $options->setTopic(sprintf("persistent://%s/out/event", $accessID));
        $options->setSubscription(sprintf('%s-sub',$accessID));
        $options->setSubscriptionType(SubscriptionType::Failover);

        // Initial position at which to set cursor when subscribing to a topic at first time.	
        // default use InitialPosition::Latest()
        // $options->setSubscriptionInitialPosition(InitialPosition::Earliest());


        // Configure how many seconds Nack's messages are redelivered, the default is 1 minute
        // $options->setNackRedeliveryDelay(20);

        // Maximum number of reconnections
        $options->setReconnectPolicy(true, 3, 100);

        $pulsarAddrCN = "pulsar+ssl://mqe.tuyacn.com:7285";
        $consumer = new Consumer($pulsarAddrCN, $options);
        // or use pulsar proxy address
        //$consumer = new Consumer('http://localhost:8080', $options);

        $consumer->connect();

        while (true) {
            $message = $consumer->receive();

            // get properties
            var_export($message->getProperties());

            echo sprintf(
                'Got message 【%s】messageID[%s] topic[%s] publishTime[%s] redeliveryCount[%d]',
                $message->getPayload(),
                $message->getMessageId(),
                $message->getTopic(),
                $message->getPublishTime(),
                $message->getRedeliveryCount()
            ) . "\n";

            // ... 

            // Remember to confirm that the message is complete after processing
            $consumer->ack($message);

            // When processing fails, you can also execute the Nack
            // The message will be re-delivered after the specified time
            // $consumer->nack($message);
        }

        $consumer->close();
    }

    /**
     * Execute the console command.
     *
     * @return int
     */
    public function handle()
    {
        Runtime::enableCoroutine(true, SWOOLE_HOOK_ALL);
        run(function () {
            $this->runExc();
        });

        return Command::SUCCESS;
    }
}

TuyaAuth.php

<?php
namespace App\Pulsar;

use Pulsar\Authentication\Authentication;

class TuyaAuth implements Authentication{

    /**
     * @var string
     */
    protected $accessID;
    /**
     * @var string
     */
    protected $accessKey;

    /**
     * @param string $token
     */
    public function __construct(string $accessID,string $accessKey)
    {
        $this->accessID = $accessID;
        $this->accessKey = $accessKey;
    }

    /**
     * @inheritDoc
     */
    public function authMethodName(): string
    {
        return 'auth1';
    }


    static protected function md5Hex(string $s):string{
        return md5($s);
    }
    /**
     * @inheritDoc
     */
    public function authData()
    {
        $key = md5($this->accessID . md5($this->accessKey));
        $key = substr($key,8,24-8);
        $data = [
            'username' => $this->accessID,
            'password' => $key,
        ];
        return json_encode($data);
    }
}

目前还不支持 pulsar+ssl 协议

@dalingng 使用 pulsar://mqe.tuyacn.com:7285 试一下

@ikilobyte I too get Socket is closed error when I use pulsar+ssl then I changed to https://mystreamnativehosteddomain:8080 then I got the below error

is this fixed ? , I too face same error, and when I connect using https, I get this error

Pulsar\Exception\ConnectException: Connection timed out in file /var/www/html/vendor/ikilobyte/pulsar-client-php/src/IO/StreamIO.php on line 55

@karthikeyan-manivasagam The SSL protocol is not currently supported

好的谢谢,我已经转成用go来接收转成amqp。所以没看消息不好意思

is this issue fixed ?

@karthikeyan-manivasagam It hasn't been fixed yet.