ikilobyte/pulsar-client-php

生成者貌似无法关闭批量模式

Closed this issue · 7 comments

这个用在腾讯云的tdmq pulsar中,发送多个消息,只有一个消息是延时消息的时候,所有的消息都会被认为是延时消息

@wanlinzan 提供一下示例代码,这边调试一下

非常感谢作者及时回复!

这边整理代码的时候发现消息是堆积在队列中无法被消费,根据腾讯云文档中的说明,造成堆积的原因可能是未关闭批量模式。
文档地址:https://cloud.tencent.com/document/product/1179/49814
image

示例代码如下:


use Exception;
use Pulsar\Consumer;
use Pulsar\ConsumerOptions;
use Pulsar\Message;
use Pulsar\MessageOptions;
use Pulsar\SubscriptionType;
use Pulsar\Authentication\Jwt;
use Pulsar\Compression\Compression;
use Pulsar\Producer;
use Pulsar\ProducerOptions;

class Tdmq
{
    public $config;

    public $topic_producers = [];
    public $topic_consumers = [];


    /**
     * Tdmq constructor.
     */
    public function __construct()
    {
        $this->config = [
            'tdmq_url' => 'http://xxxxxxxxxxxxxxxx:8080',
            'tdmq_token' => 'xxxxxxxxxxxxxxxxxxxxxxxxxx',
            'tdmq_topic_prefix' => 'persistent://xxxxx/xxxxxxx/',
            'tdmq_subscription_name' => 'default-handler'
        ];
    }

    public function __destruct()
    {
        foreach ($this->topic_producers as $topic_producer) {
            $topic_producer->close();
        }
        foreach ($this->topic_consumers as $topic_consumer) {
            $topic_consumer->close();
        }
    }

    /**
     * 发送消息
     * @param $topic_name
     * @param array $message
     * @param int $delay_time
     * @return array
     * @throws Exception
     */
    public function send(array $message, $delay_time = 0, $topic_name = 'common'): array
    {
        if (!isset($this->topic_producers[$topic_name])) {

            $options = new ProducerOptions();

            $options->setAuthentication(new Jwt($this->config['tdmq_token']));

            $options->setConnectTimeout(3);
            $options->setTopic($this->config['tdmq_topic_prefix'] . $topic_name);
            $options->setCompression(Compression::ZLIB);

            $this->topic_producers[$topic_name] = new Producer($this->config['tdmq_url'], $options);
            $this->topic_producers[$topic_name]->connect();
        }

        if (isset($message[0]) && is_array($message[0])) {
            $messages = $message;
        } else {
            $messages = [$message];
        }

        $results = [];

        foreach ($messages as $message) {
            $messageID = $this->topic_producers[$topic_name]->send(json_encode($message, 256), [
                MessageOptions::DELAY_SECONDS => $delay_time
            ]);
            $results[] = $messageID;
        }

        return $results;
    }


    /**
     * 接收消息
     * @param $topic_name
     * @param int $size
     * @return array
     * @throws Exception
     */
    public function receive($size = 1, $topic_name = 'common'): array
    {
        if (!isset($this->topic_consumers[$topic_name])) {
            $this->_initConsumer($topic_name);
        }

        $messages = [];

        while (true) {

            try {
                $messages[] = $this->topic_consumers[$topic_name]->receive(false);

                if (count($messages) >= $size) {
                    break;
                }

            } catch (Exception $exception) {
                break;
            }
        }

        return $messages;
    }

    /**
     * 确认消息
     * @param $topic_name
     * @param Message $message
     * @throws Exception
     */
    public function ack(Message $message, $topic_name = 'common')
    {
        if (!isset($this->topic_consumers[$topic_name])) {
            $this->_initConsumer($topic_name);
        }

        $this->topic_consumers[$topic_name]->ack($message);
    }

    /**
     * @param $topic_name
     * @throws Exception
     */
    private function _initConsumer($topic_name)
    {
        $options = new ConsumerOptions();

        $options->setAuthentication(new Jwt($this->config['tdmq_token']));

        $options->setConnectTimeout(3);
        $options->setTopic($this->config['tdmq_topic_prefix'] . $topic_name);
        $options->setSubscriptionType(SubscriptionType::Shared);
        $options->setSubscription($this->config['tdmq_subscription_name']);
        $options->setNackRedeliveryDelay(1); // 这个参数可以让当前消费者未断开连接的情况下,更早的回到队列中让其他消费者消费
        $options->setReceiveQueueSize(100);


        $consumer = new Consumer($this->config['tdmq_url'], $options);
        $consumer->connect();
        $this->topic_consumers[$topic_name] = $consumer;
    }

}



$tdmq = new Tdmq();

$tdmq->send([
    'content' => 'tagging_optimal',
]);
$tdmq->send([
    'content' => 'delete_test_answer_data',
],500);

这边尝试把 ->receive(false) 改为->receive(true)就可以获取消息了,但是获取消息需要4到5秒钟,是不是可以认为是腾讯云消息队列拉取消息慢

@wanlinzan

我试了一下,没有复现你说的这种情况,pulsar是需要先创建消费者的,我在你的代码下面多加了一下代码,你可以试一下能否成功,另外pulsar所有的通信都是基于TCP的,实时性很好,除非消费端业务逻辑有积压,不然不会那么久才收到消息的

// ... 放在最下面

$tdmq = new Tdmq();

if ($argv[1] === 'consumer') {
    while (true) {

        foreach ($tdmq->receive(1) as $key => $message)
        {
            /**
             * @var $message Message
             */
            echo $message->getPayload() . "\n";

            $tdmq->ack($message);
        }
    }
}


for ($i = 0;$i < 10;$i++) {
    $result = $tdmq->send([
        'content' => 'tagging_optimal',
    ],$i % 2 == 0 ? 30 : 0);
    var_dump($result);
}

终端一,开启消费者,先执行这个

php xxx.php consumer

终端二,推送消息

php xxx.php producer

测试了一下,让消费者保持的时候没问题,感谢。

好的,有问题继续反馈,这个就先关闭了

兄弟方便加个vx吗,有点关于sdk的问题想付费咨询一下,我的vx:show_622
感谢