hyperf开启协程生产消息ReactorEpoll告警
exewen opened this issue · 11 comments
直接使用 $producer->send();
会报错吗?不要在Parallel里面,hyperf启动就已经是一个协程环境了
多个进程肯定是消费者*max了
多进程消费是常见情况,为什么用消息id计数,而不用实际重新投递次数判断呢?消费者个数正常需要根据业务动态调整的,如果每个消费者都独立计数,就无法把控消息的重试次数,就不符合预期了
pulsar server没有提供这种功能,这些都是由客户端实现的
一个消费者会给每个分区都起一个连接,目前发现消息积压时,消费者receive分区消息并不均匀,偏好消费某一固定分区消费,请问有什么配置或者办法修改分区切换策略吗
关于每个分区都建立一个tcp连接,分区是pulsar server分配的,可能在服务器A或服务器B也有可能都在服务器A,建立多少个连接不是客户端可控的
-- 消费者receive分区消息并不均匀
请问生产者是用的本库吗?如果是分区topic,目前的做法是随机选择一个生产者推送到对应的分区topic
我看了一下代码,假设stream_select返回了2个socket
- 如果第一个收到了
PING
等协议(业务逻辑用不上),此时$response
为null,会继续处理第二个socket - 如果第一个收到是
MESSAGE
协议,此时$response
不为null,会退出for循环,consumer->receive
会有返回值
业务逻辑处理完毕后,继续执行consumer->receive
,stream_select会返回可读的socket,如果$reads里面又有第一个socket,那可能会导致上一阶段的其它socket读不到数据,但这种情况不太多
pulsar server 会根据ReceiveQueueSize
推送数据,假设配置的是1000,pulsar会一次推送1000条消息到客户端,当收到这1000条消息的时候,会缓存到内存中,下次继续接收消息时会优先从内存中获取。
缓存中获取
// get message from local queue
if (!$this->messageQueue->isEmpty()) {
return $this->messageQueue->dequeue();
}
不建议改ReceiveQueueSize,使用默认值即可,当然你也可以改成1,目前消费者是单进程处理吗?
- 单进程也有这个问题,只要是多分区就会,因为每个分区都会有一个socket,正常如果消息没有积压没有关系,第一个分区的socket读不到消息会读下一个
- 当消息积压时只要第一个socket能读到消息,那就会一直从这个socket读message,其他分区就一直等待,这时候其他分区的消息要等第一个socket所绑定的分区消息消费完毕
- 没有消息积压的时候这样调度无所谓,有消息积压的时候很多消息就会滞后处理。如果每次读取时把socket打乱顺序,新的顺序时上个读不到消息仍然不影响后面的读取,如果每个socket都可以读取到消息,那随机读取一个socket,可能分区之间的消费会公平些
我重新理了一下,是要打乱一下顺序,等会发一个版本,你更新一下包
@presidentma v1.1.9