ikilobyte/pulsar-client-php

hyperf开启协程生产消息ReactorEpoll告警

exewen opened this issue · 11 comments

exewen commented

运行模式也是SWOOLE_HOOK_ALL
[2023-04-14 16:37:11 @5006.0] WARNING ReactorEpoll::add(): failed to add events[fd=8#0, type=7, events=512], Error: File exists[17]
[2023-04-14 16:37:11 @5006.0] WARNING network::socket_free_defer(): close(8) failed, Error: Bad file descriptor[9]
image
image
image

直接使用 $producer->send(); 会报错吗?不要在Parallel里面,hyperf启动就已经是一个协程环境了

多个进程肯定是消费者*max了

多进程消费是常见情况,为什么用消息id计数,而不用实际重新投递次数判断呢?消费者个数正常需要根据业务动态调整的,如果每个消费者都独立计数,就无法把控消息的重试次数,就不符合预期了

pulsar server没有提供这种功能,这些都是由客户端实现的

一个消费者会给每个分区都起一个连接,目前发现消息积压时,消费者receive分区消息并不均匀,偏好消费某一固定分区消费,请问有什么配置或者办法修改分区切换策略吗

关于每个分区都建立一个tcp连接,分区是pulsar server分配的,可能在服务器A或服务器B也有可能都在服务器A,建立多少个连接不是客户端可控的

-- 消费者receive分区消息并不均匀
请问生产者是用的本库吗?如果是分区topic,目前的做法是随机选择一个生产者推送到对应的分区topic

生产没问题是比较均匀的,消费拉取时速度分区不均匀,会一直拉某一个分区,不会切换。
我看了一下调度,一旦某个连接有数据那就会一直读下去,不会切换,但是其实是有多个分区多个连接的,这样调度直到前面的连接拿不到数据,下一个连接才能拿到数据,是否用shuffle()打乱一下顺序相对比较好点
image

我看了一下代码,假设stream_select返回了2个socket

  • 如果第一个收到了PING等协议(业务逻辑用不上),此时$response为null,会继续处理第二个socket
  • 如果第一个收到是MESSAGE协议,此时$response不为null,会退出for循环,consumer->receive会有返回值

业务逻辑处理完毕后,继续执行consumer->receive,stream_select会返回可读的socket,如果$reads里面又有第一个socket,那可能会导致上一阶段的其它socket读不到数据,但这种情况不太多

pulsar server 会根据ReceiveQueueSize推送数据,假设配置的是1000,pulsar会一次推送1000条消息到客户端,当收到这1000条消息的时候,会缓存到内存中,下次继续接收消息时会优先从内存中获取。

缓存中获取

// get message from local queue
if (!$this->messageQueue->isEmpty()) {
    return $this->messageQueue->dequeue();
}

不建议改ReceiveQueueSize,使用默认值即可,当然你也可以改成1,目前消费者是单进程处理吗?

  • 单进程也有这个问题,只要是多分区就会,因为每个分区都会有一个socket,正常如果消息没有积压没有关系,第一个分区的socket读不到消息会读下一个
  • 当消息积压时只要第一个socket能读到消息,那就会一直从这个socket读message,其他分区就一直等待,这时候其他分区的消息要等第一个socket所绑定的分区消息消费完毕
  • 没有消息积压的时候这样调度无所谓,有消息积压的时候很多消息就会滞后处理。如果每次读取时把socket打乱顺序,新的顺序时上个读不到消息仍然不影响后面的读取,如果每个socket都可以读取到消息,那随机读取一个socket,可能分区之间的消费会公平些

我重新理了一下,是要打乱一下顺序,等会发一个版本,你更新一下包