apache/pulsar-client-go

[BUG] The capacity of dataChan is too big

Gleiphir2769 opened this issue · 2 comments

Hi community. I noticed that the capacity of dataChan is too big. We make the dataChan with capacity MaxPendingMessages and the account of pending messages actual in queue may exceed the limit by MaxPendingMessages a lot.

This is because we use publishSemaphore instead of capacity of dataChan to limit pending messages. The actual limit is 2X of MaxPendingMessages.

dataChan: make(chan *sendRequest, maxPendingMessages),

This is because we use publishSemaphore instead of capacity of dataChan to limit pending messages. The actual limit is 2X of MaxPendingMessages.

What does it mean? Both publishSemaphore and dataChan has n=MaxPendingMessages permits to use.

dataChan:         make(chan *sendRequest, maxPendingMessages),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),

Because we acquire publishSemaphore after the consumption of dataChan. In some cases we may need to keep twice the number of MaxPendingMessages messages in memory. And it will break the producer memory limit.

p.updateMetaData(sr)
p.dataChan <- sr
}

func (p *partitionProducer) runEventsLoop() {
for {
select {
case data := <-p.dataChan:
p.internalSend(data)
case i := <-p.cmdChan:

if !p.canAddToQueue(sr) {
return
}
// try to reserve memory for uncompressedPayload
if !p.canReserveMem(sr, sr.uncompressedSize) {
return
}

But the concept here is a bit vague. MaxPendingMessages actually limits messages that are being sent to the broker but have not received ack. It actually has nothing to do with producer memory limit, but it affects the accuracy of mem limit because of the execution order. So I think it's better to reduce the size of datachan.

What do you think?