After mq message consumption fails, the number of the message in the retry queue continues to grow
sologgfun opened this issue · 4 comments
BUG REPORT
-
Please describe the issue you observed:
- What did you do (The steps to reproduce)?
c, err := rocketmq.NewPushConsumer(
consumer.WithNameServer(strings.Split(MqNameSrvAddr, ";")),
consumer.WithGroupName(NewGroupName(id)),
consumer.WithVIPChannel(false),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithConsumeFromWhere(consumer.ConsumeFromFirstOffset),
consumer.WithRetry(3))
if err != nil {
LogMopQueue("failed to new push consumer", err, "")
return err
}
err = c.Subscribe(Topic, consumer.MessageSelector{Type: consumer.TAG, Expression: Tag}, MessageListener)
if err != nil {
LogMopQueue("failed to subscribe to mop", err, "")
}
- What did you expect to see?
When a MQ message fails to be consumed, the MessageListener returns an error code, and the MQ message is requeued into the retry queue. If the retry attempts exceed a certain limit, the retry process will be terminated.
- What did you see instead?
Due to an issue with a specific message, it has been consistently failing to be consumed, resulting in an exponential growth of this message in the retry queue. Eventually, after several days, the client started receiving hundreds of identical messages per second.
-
Please tell us about your environment:
-
What is your OS?
-
What is your client version?
-
What is your RocketMQ version?
github.com/apache/rocketmq-client-go/v2 v2.1.1
-
-
Other information (e.g. detailed explanation, logs, related issues, suggestions on how to fix, etc):
FEATURE REQUEST
- Please describe the feature you are requesting.
I would like to know why the same message keeps increasing in the retry queue after a consumption failure. Based on my understanding, I would expect the message to appear in the retry queue only once after a consumption failure, rather than causing the client to receive hundreds of identical messages per second.
有一种可能是消费端存在重复的clientID,导致rebalance错误,即重试队列被多个实例同时消费,举例:假设有三个重复的clientID,都在消费重试队列,一条消息消费失败后会变成三条,三条会被每个客户端消费,然后消费失败变成9条。。。就会呈现指数值增加 @sologgfun
@cserwen 感谢你的回复,我观察了一下,pod启动的consumer,ip是podip,而clientid是ip@instanceid组成,由于同一个集群内的podip没重复,因此无重复clientid
另外想问一下consumer的RetryTimes作用是什么?源码里我没找到consumer对应的使用,只找到producer里的retryTimes用法。再次感谢!
没有用处,可以删掉 @sologgfun
我认为你只是使用了错误的 option
rocketmq-client-go/consumer/push_consumer.go
Lines 1399 to 1405 in da20ee7
WithMaxReconsumeTimes 才是正确的重试次数
rocketmq-client-go/consumer/option.go
Lines 270 to 276 in da20ee7