生成者貌似无法关闭批量模式
Closed this issue · 7 comments
wanlinzan commented
这个用在腾讯云的tdmq pulsar中,发送多个消息,只有一个消息是延时消息的时候,所有的消息都会被认为是延时消息
ikilobyte commented
@wanlinzan 提供一下示例代码,这边调试一下
wanlinzan commented
非常感谢作者及时回复!
这边整理代码的时候发现消息是堆积在队列中无法被消费,根据腾讯云文档中的说明,造成堆积的原因可能是未关闭批量模式。
文档地址:https://cloud.tencent.com/document/product/1179/49814
示例代码如下:
use Exception;
use Pulsar\Consumer;
use Pulsar\ConsumerOptions;
use Pulsar\Message;
use Pulsar\MessageOptions;
use Pulsar\SubscriptionType;
use Pulsar\Authentication\Jwt;
use Pulsar\Compression\Compression;
use Pulsar\Producer;
use Pulsar\ProducerOptions;
class Tdmq
{
public $config;
public $topic_producers = [];
public $topic_consumers = [];
/**
* Tdmq constructor.
*/
public function __construct()
{
$this->config = [
'tdmq_url' => 'http://xxxxxxxxxxxxxxxx:8080',
'tdmq_token' => 'xxxxxxxxxxxxxxxxxxxxxxxxxx',
'tdmq_topic_prefix' => 'persistent://xxxxx/xxxxxxx/',
'tdmq_subscription_name' => 'default-handler'
];
}
public function __destruct()
{
foreach ($this->topic_producers as $topic_producer) {
$topic_producer->close();
}
foreach ($this->topic_consumers as $topic_consumer) {
$topic_consumer->close();
}
}
/**
* 发送消息
* @param $topic_name
* @param array $message
* @param int $delay_time
* @return array
* @throws Exception
*/
public function send(array $message, $delay_time = 0, $topic_name = 'common'): array
{
if (!isset($this->topic_producers[$topic_name])) {
$options = new ProducerOptions();
$options->setAuthentication(new Jwt($this->config['tdmq_token']));
$options->setConnectTimeout(3);
$options->setTopic($this->config['tdmq_topic_prefix'] . $topic_name);
$options->setCompression(Compression::ZLIB);
$this->topic_producers[$topic_name] = new Producer($this->config['tdmq_url'], $options);
$this->topic_producers[$topic_name]->connect();
}
if (isset($message[0]) && is_array($message[0])) {
$messages = $message;
} else {
$messages = [$message];
}
$results = [];
foreach ($messages as $message) {
$messageID = $this->topic_producers[$topic_name]->send(json_encode($message, 256), [
MessageOptions::DELAY_SECONDS => $delay_time
]);
$results[] = $messageID;
}
return $results;
}
/**
* 接收消息
* @param $topic_name
* @param int $size
* @return array
* @throws Exception
*/
public function receive($size = 1, $topic_name = 'common'): array
{
if (!isset($this->topic_consumers[$topic_name])) {
$this->_initConsumer($topic_name);
}
$messages = [];
while (true) {
try {
$messages[] = $this->topic_consumers[$topic_name]->receive(false);
if (count($messages) >= $size) {
break;
}
} catch (Exception $exception) {
break;
}
}
return $messages;
}
/**
* 确认消息
* @param $topic_name
* @param Message $message
* @throws Exception
*/
public function ack(Message $message, $topic_name = 'common')
{
if (!isset($this->topic_consumers[$topic_name])) {
$this->_initConsumer($topic_name);
}
$this->topic_consumers[$topic_name]->ack($message);
}
/**
* @param $topic_name
* @throws Exception
*/
private function _initConsumer($topic_name)
{
$options = new ConsumerOptions();
$options->setAuthentication(new Jwt($this->config['tdmq_token']));
$options->setConnectTimeout(3);
$options->setTopic($this->config['tdmq_topic_prefix'] . $topic_name);
$options->setSubscriptionType(SubscriptionType::Shared);
$options->setSubscription($this->config['tdmq_subscription_name']);
$options->setNackRedeliveryDelay(1); // 这个参数可以让当前消费者未断开连接的情况下,更早的回到队列中让其他消费者消费
$options->setReceiveQueueSize(100);
$consumer = new Consumer($this->config['tdmq_url'], $options);
$consumer->connect();
$this->topic_consumers[$topic_name] = $consumer;
}
}
$tdmq = new Tdmq();
$tdmq->send([
'content' => 'tagging_optimal',
]);
$tdmq->send([
'content' => 'delete_test_answer_data',
],500);
wanlinzan commented
这边尝试把 ->receive(false) 改为->receive(true)就可以获取消息了,但是获取消息需要4到5秒钟,是不是可以认为是腾讯云消息队列拉取消息慢
ikilobyte commented
我试了一下,没有复现你说的这种情况,pulsar是需要先创建消费者的,我在你的代码下面多加了一下代码,你可以试一下能否成功,另外pulsar所有的通信都是基于TCP的,实时性很好,除非消费端业务逻辑有积压,不然不会那么久才收到消息的
// ... 放在最下面
$tdmq = new Tdmq();
if ($argv[1] === 'consumer') {
while (true) {
foreach ($tdmq->receive(1) as $key => $message)
{
/**
* @var $message Message
*/
echo $message->getPayload() . "\n";
$tdmq->ack($message);
}
}
}
for ($i = 0;$i < 10;$i++) {
$result = $tdmq->send([
'content' => 'tagging_optimal',
],$i % 2 == 0 ? 30 : 0);
var_dump($result);
}
终端一,开启消费者,先执行这个
php xxx.php consumer
终端二,推送消息
php xxx.php producer
wanlinzan commented
测试了一下,让消费者保持的时候没问题,感谢。
ikilobyte commented
好的,有问题继续反馈,这个就先关闭了
wanlinzan commented
兄弟方便加个vx吗,有点关于sdk的问题想付费咨询一下,我的vx:show_622
感谢