[Bug] receiver_queue_size is not honoured with partitioned topic #21593
Opened this issue · 0 comments
Versions
Pulsar version 2.10.4
OS: Ubuntu(version=22.04)
Client(Python): pulsar-client(Version=3.3.0)
Minimal reproduce step
Create a partitioned topic with one partition
bin/pulsar-admin topics create-partitioned-topic persistent://public/default/test-consumer -p 1
Publish some messages
Create one consumer as following
consumer = client.subscribe( topic='persistent://public/default/test-consumer', subscription_name='test', consumer_type=ConsumerType.Shared, initial_position=InitialPosition.Earliest, receiver_queue_size=1, max_total_receiver_queue_size_across_partitions=1, consumer_name=None, negative_ack_redelivery_delay_ms=60000, unacked_messages_timeout_ms=3600000)
Receive and ack one message
consumer.receive(timeout_millis=30000)
consumer.acknowledge(message=message_id)
Check stats(Its showing 2 unack messages but its not received yet, also if we create another consumer and try to consume messages its not delivering)
What did you expect to see?
It should not show 2 unack messages, instead it should show 1 message since receiver size is 1.
What did you see instead?
I can see 2 unack in stats as following
Command: bin/pulsar-admin topics partitioned-stats persistent://public/default/test-consumer
{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.07158759338136238,
"msgThroughputOut" : 4.271393071754622,
"bytesInCounter" : 179,
"msgInCounter" : 3,
"bytesOutCounter" : 179,
"msgOutCounter" : 3,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 179,
"backlogSize" : 120,
"publishRateLimitedTimes" : 0,
"earliestMsgPublishTimeInBacklogs" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"publishers" : [ ],
"waitingPublishers" : 0,
"subscriptions" : {
"test" : {
"msgRateOut" : 0.07158759338136238,
"msgThroughputOut" : 4.271393071754622,
"bytesOutCounter" : 179,
"msgOutCounter" : 3,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 2,
"backlogSize" : 0,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 2,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 2,
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"lastMarkDeleteAdvancedTimestamp" : 0,
"consumers" : [ {
"msgRateOut" : 0.07158759338136238,
"msgThroughputOut" : 4.271393071754622,
"bytesOutCounter" : 179,
"msgOutCounter" : 3,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.023862530971099603,
"chunkedMessageRate" : 0.0,
"availablePermits" : 0,
"unackedMessages" : 2,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 0
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"availablePermits" : 1,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 0
} ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"subscriptionProperties" : { },
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
},
"metadata" : {
"partitions" : 1
},
"partitions" : { }
}