apache/pulsar-client-python

[Bug] receiver_queue_size is not honoured with partitioned topic #21593

Opened this issue · 0 comments

Versions
Pulsar version 2.10.4
OS: Ubuntu(version=22.04)
Client(Python): pulsar-client(Version=3.3.0)

Minimal reproduce step
Create a partitioned topic with one partition
bin/pulsar-admin topics create-partitioned-topic persistent://public/default/test-consumer -p 1

Publish some messages

Create one consumer as following
consumer = client.subscribe( topic='persistent://public/default/test-consumer', subscription_name='test', consumer_type=ConsumerType.Shared, initial_position=InitialPosition.Earliest, receiver_queue_size=1, max_total_receiver_queue_size_across_partitions=1, consumer_name=None, negative_ack_redelivery_delay_ms=60000, unacked_messages_timeout_ms=3600000)

Receive and ack one message
consumer.receive(timeout_millis=30000)
consumer.acknowledge(message=message_id)

Check stats(Its showing 2 unack messages but its not received yet, also if we create another consumer and try to consume messages its not delivering)

What did you expect to see?
It should not show 2 unack messages, instead it should show 1 message since receiver size is 1.

What did you see instead?
I can see 2 unack in stats as following
Command: bin/pulsar-admin topics partitioned-stats persistent://public/default/test-consumer

{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.07158759338136238,
"msgThroughputOut" : 4.271393071754622,
"bytesInCounter" : 179,
"msgInCounter" : 3,
"bytesOutCounter" : 179,
"msgOutCounter" : 3,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 179,
"backlogSize" : 120,
"publishRateLimitedTimes" : 0,
"earliestMsgPublishTimeInBacklogs" : 0,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"publishers" : [ ],
"waitingPublishers" : 0,
"subscriptions" : {
"test" : {
"msgRateOut" : 0.07158759338136238,
"msgThroughputOut" : 4.271393071754622,
"bytesOutCounter" : 179,
"msgOutCounter" : 3,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 2,
"backlogSize" : 0,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 2,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 2,
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 0,
"lastConsumedTimestamp" : 0,
"lastAckedTimestamp" : 0,
"lastMarkDeleteAdvancedTimestamp" : 0,
"consumers" : [ {
"msgRateOut" : 0.07158759338136238,
"msgThroughputOut" : 4.271393071754622,
"bytesOutCounter" : 179,
"msgOutCounter" : 3,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.023862530971099603,
"chunkedMessageRate" : 0.0,
"availablePermits" : 0,
"unackedMessages" : 2,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 0
}, {
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 0.0,
"chunkedMessageRate" : 0.0,
"availablePermits" : 1,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 0,
"lastConsumedTimestamp" : 0
} ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"subscriptionProperties" : { },
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"nonContiguousDeletedMessagesRanges" : 0,
"nonContiguousDeletedMessagesRangesSerializedSize" : 0,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
},
"metadata" : {
"partitions" : 1
},
"partitions" : { }
}