[Bug][Producer] Inaccurate producer memory limit issue in chunking and schema
gunli opened this issue · 1 comments
Expected behavior
- Get the right uncompressedPayloadSize from
msg.Payload
ormsg.Schema
; - Release the memory of current failed chunk:
// the permit of first chunk has acquired
if chunkID != 0 && !p.canAddToQueue(nsr, 0) {
p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(lhs))
return
}
Actual behavior
- if we use schema, the
msg.Payload
should be nil and we request 0 memory at the beginningpartitionProducer.failTimeoutMessages()
(pulsar/producer_partition.go:480/492):
uncompressedPayload := msg.Payload//HOW ABOUT WE USE SCHEMA ???
uncompressedPayloadSize := int64(len(uncompressedPayload))
var schemaPayload []byte
var err error
if msg.Value != nil && msg.Payload != nil {
p.log.Error("Can not set Value and Payload both")
runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both"))
return
}
// The block chan must be closed when returned with exception
defer request.stopBlock()
if !p.canAddToQueue(request, uncompressedPayloadSize) {
return
}
- in
partitionProducer.internalSend()
(pulsar/producer_partition.go:663), when chunking is enable, we send each chunk one by one, when a chunkcanAddToQueue
failed, we release the memory that have not been sent. When we look at the code below, it seems forget to release the memory of current failed chunk, 'causelhs
andrhs
have been updated tolhs = chunkID * payloadChunkSize
andrhs = lhs + payloadChunkSize
, if we want to release the memory of the current failed chunk and the later ones, I think we shouldp.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(lhs))
. The java client isclient.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
|-----------------------uncompressedPayloadSize-----------------------|
|----------------compressedPayload----------------|
|--prev chunks--|--cur chunk--|---------------------------------------|
|--prev chunks--|-------------|-----(uncompressedPayloadSize-rhs)-----|
|--prev chunks--|-------------(uncompressedPayloadSize-lhs)-----------|
// the permit of first chunk has acquired
if chunkID != 0 && !p.canAddToQueue(nsr, 0) {
p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(rhs))//SHOULD BE lhs ???
return
}
And, if msg.Payload
is nil, uncompressedPayloadSize should be 0, and we will release a negative amount of memory, the memLimit will be in a wrong state some time.
- when a chunk failed, we release the memory haven't been sent, and the original meesage will be failed finally, but when in
partitionProducer.failTimeoutMessages()
(pulsar/producer_partition.go:978) we release the memory of the size ofmsg.Payload
, it seems we release too much here, 'cause the memory of the failed chunks has been release before.
size := len(sr.msg.Payload)//HOW ABOUT WE USE SCHEMA ???
p.releaseSemaphoreAndMem(int64(size))
- In
partitionProducer.ReceivedSendReceipt()
we release memory byp.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
, if a message is chunked, there will manysendRequest
point to that message, it seems we will release more than we reqeusted.
for idx, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
p.metrics.MessagesPublished.Inc()
p.metrics.MessagesPending.Dec()
payloadSize := float64(len(sr.msg.Payload))
p.metrics.BytesPublished.Add(payloadSize)
p.metrics.BytesPending.Sub(payloadSize)
}
}
Steps to reproduce
How can we reproduce the issue
Code review
func (p *partitionProducer) internalSend(request *sendRequest) {
p.log.Debug("Received send request: ", *request.msg)
msg := request.msg
// read payload from message
uncompressedPayload := msg.Payload
uncompressedPayloadSize := int64(len(uncompressedPayload))
var schemaPayload []byte
var err error
if msg.Value != nil && msg.Payload != nil {
p.log.Error("Can not set Value and Payload both")
runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both"))
return
}
// The block chan must be closed when returned with exception
defer request.stopBlock()
if !p.canAddToQueue(request, uncompressedPayloadSize) {
return
}
if p.options.DisableMultiSchema {
if msg.Schema != nil && p.options.Schema != nil &&
msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema"))
p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
return
}
}
var schema Schema
var schemaVersion []byte
if msg.Schema != nil {
schema = msg.Schema
} else if p.options.Schema != nil {
schema = p.options.Schema
}
if msg.Value != nil {
// payload and schema are mutually exclusive
// try to get payload from schema value only if payload is not set
if uncompressedPayload == nil && schema != nil {
schemaPayload, err = schema.Encode(msg.Value)
if err != nil {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error()))
p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
return
}
}
}
if uncompressedPayload == nil {
uncompressedPayload = schemaPayload
}
if schema != nil {
schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
if schemaVersion == nil {
schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
if err != nil {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
p.log.WithError(err).Error("get schema version fail")
runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err))
return
}
p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
}
}
uncompressedSize := len(uncompressedPayload)
deliverAt := msg.DeliverAt
if msg.DeliverAfter.Nanoseconds() > 0 {
deliverAt = time.Now().Add(msg.DeliverAfter)
}
mm := p.genMetadata(msg, uncompressedSize, deliverAt)
// set default ReplicationClusters when DisableReplication
if msg.DisableReplication {
msg.ReplicationClusters = []string{"__local__"}
}
sendAsBatch := !p.options.DisableBatching &&
msg.ReplicationClusters == nil &&
deliverAt.UnixNano() < 0
// Once the batching is enabled, it can close blockCh early to make block finish
if sendAsBatch {
request.stopBlock()
} else {
// update sequence id for metadata, make the size of msgMetadata more accurate
// batch sending will update sequence ID in the BatchBuilder
p.updateMetadataSeqID(mm, msg)
}
maxMessageSize := int(p._getConn().GetMaxMessageSize())
// compress payload if not batching
var compressedPayload []byte
var compressedSize int
var checkSize int
if !sendAsBatch {
compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
compressedSize = len(compressedPayload)
checkSize = compressedSize
// set the compress type in msgMetaData
compressionType := pb.CompressionType(p.options.CompressionType)
if compressionType != pb.CompressionType_NONE {
mm.Compression = &compressionType
}
} else {
// final check for batching message is in serializeMessage
// this is a double check
checkSize = uncompressedSize
}
// if msg is too large and chunking is disabled
if checkSize > maxMessageSize && !p.options.EnableChunking {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, request.msg, errMessageTooLarge)
p.log.WithError(errMessageTooLarge).
WithField("size", checkSize).
WithField("properties", msg.Properties).
Errorf("MaxMessageSize %d", maxMessageSize)
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
var totalChunks int
// max chunk payload size
var payloadChunkSize int
if sendAsBatch || !p.options.EnableChunking {
totalChunks = 1
payloadChunkSize = int(p._getConn().GetMaxMessageSize())
} else {
payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm)
if payloadChunkSize <= 0 {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, msg, errMetaTooLarge)
p.log.WithError(errMetaTooLarge).
WithField("metadata size", proto.Size(mm)).
WithField("properties", msg.Properties).
Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
p.metrics.PublishErrorsMsgTooLarge.Inc()
return
}
// set ChunkMaxMessageSize
if p.options.ChunkMaxMessageSize != 0 {
payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
}
totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
}
// set total chunks to send request
request.totalChunks = totalChunks
if !sendAsBatch {
if totalChunks > 1 {
var lhs, rhs int
uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
mm.Uuid = proto.String(uuid)
mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))
mm.TotalChunkMsgSize = proto.Int32(int32(compressedSize))
cr := newChunkRecorder()
for chunkID := 0; chunkID < totalChunks; chunkID++ {
lhs = chunkID * payloadChunkSize
if rhs = lhs + payloadChunkSize; rhs > compressedSize {
rhs = compressedSize
}
// update chunk id
mm.ChunkId = proto.Int32(int32(chunkID))
nsr := &sendRequest{
ctx: request.ctx,
msg: request.msg,
callback: request.callback,
callbackOnce: request.callbackOnce,
publishTime: request.publishTime,
blockCh: request.blockCh,
closeBlockChOnce: request.closeBlockChOnce,
totalChunks: totalChunks,
chunkID: chunkID,
uuid: uuid,
chunkRecorder: cr,
transaction: request.transaction,
}
// the permit of first chunk has acquired
if chunkID != 0 && !p.canAddToQueue(nsr, 0) {
p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(rhs))//BUG, SHOULD BE lhs
return
}
p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
}
// close the blockCh when all the chunks acquired permits
request.stopBlock()
} else {
// close the blockCh when totalChunks is 1 (it has acquired permits)
request.stopBlock()
p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
}
} else {
smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize)
multiSchemaEnabled := !p.options.DisableMultiSchema
added := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
multiSchemaEnabled)
if !added {
// The current batch is full. flush it and retry
p.internalFlushCurrentBatch()
// after flushing try again to add the current payload
if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
multiSchemaEnabled); !ok {
p.releaseSemaphoreAndMem(uncompressedPayloadSize)
runCallback(request.callback, nil, request.msg, errFailAddToBatch)
p.log.WithField("size", uncompressedSize).
WithField("properties", msg.Properties).
Error("unable to add message to batch")
return
}
}
if request.flushImmediately {
p.internalFlushCurrentBatch()
}
}
}
func (p *partitionProducer) failTimeoutMessages() {
diff := func(sentAt time.Time) time.Duration {
return p.options.SendTimeout - time.Since(sentAt)
}
t := time.NewTimer(p.options.SendTimeout)
defer t.Stop()
for range t.C {
state := p.getProducerState()
if state == producerClosing || state == producerClosed {
return
}
item := p.pendingQueue.Peek()
if item == nil {
// pending queue is empty
t.Reset(p.options.SendTimeout)
continue
}
oldestItem := item.(*pendingItem)
if nextWaiting := diff(oldestItem.sentAt); nextWaiting > 0 {
// none of these pending messages have timed out, wait and retry
t.Reset(nextWaiting)
continue
}
// since pending queue is not thread safe because of there is no global iteration lock
// to control poll from pending queue, current goroutine and connection receipt handler
// iterate pending queue at the same time, this maybe a performance trade-off
// see https://github.com/apache/pulsar-client-go/pull/301
curViewItems := p.pendingQueue.ReadableSlice()
viewSize := len(curViewItems)
if viewSize <= 0 {
// double check
t.Reset(p.options.SendTimeout)
continue
}
p.log.Infof("Failing %d messages on timeout %s", viewSize, p.options.SendTimeout)
lastViewItem := curViewItems[viewSize-1].(*pendingItem)
// iterate at most viewSize items
for i := 0; i < viewSize; i++ {
tickerNeedWaiting := time.Duration(0)
item := p.pendingQueue.CompareAndPoll(
func(m interface{}) bool {
if m == nil {
return false
}
pi := m.(*pendingItem)
pi.Lock()
defer pi.Unlock()
if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
// current and subsequent items not timeout yet, stop iterating
tickerNeedWaiting = nextWaiting
return false
}
return true
})
if item == nil {
t.Reset(p.options.SendTimeout)
break
}
if tickerNeedWaiting > 0 {
t.Reset(tickerNeedWaiting)
break
}
pi := item.(*pendingItem)
pi.Lock()
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
size := len(sr.msg.Payload)
p.releaseSemaphoreAndMem(int64(size))
p.metrics.MessagesPending.Dec()
p.metrics.BytesPending.Sub(float64(size))
p.metrics.PublishErrorsTimeout.Inc()
p.log.WithError(errSendTimeout).
WithField("size", size).
WithField("properties", sr.msg.Properties)
}
if sr.callback != nil {
sr.callbackOnce.Do(func() {
runCallback(sr.callback, nil, sr.msg, errSendTimeout)
})
}
if sr.transaction != nil {
sr.transaction.endSendOrAckOp(nil)
}
}
// flag the sending has completed with error, flush make no effect
pi.Complete()
pi.Unlock()
// finally reached the last view item, current iteration ends
if pi == lastViewItem {
t.Reset(p.options.SendTimeout)
break
}
}
}
}
func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
pi, ok := p.pendingQueue.Peek().(*pendingItem)
if !ok {
// if we receive a receipt although the pending queue is empty, the state of the broker and the producer differs.
p.log.Warnf("Got ack %v for timed out msg", response.GetMessageId())
return
}
if pi.sequenceID < response.GetSequenceId() {
// Force connection closing so that messages can be re-transmitted in a new connection
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
p._getConn().Close()
return
} else if pi.sequenceID > response.GetSequenceId() {
// Ignoring the ack since it's referring to a message that has already timed out.
p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
return
} else {
// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
p.pendingQueue.Poll()
now := time.Now().UnixNano()
// lock the pending item while sending the requests
pi.Lock()
defer pi.Unlock()
p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
batchSize := int32(0)
for _, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
batchSize = batchSize + 1
} else { // Flush request
break
}
}
for idx, i := range pi.sendRequests {
sr := i.(*sendRequest)
if sr.msg != nil {
atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
p.metrics.MessagesPublished.Inc()
p.metrics.MessagesPending.Dec()
payloadSize := float64(len(sr.msg.Payload))
p.metrics.BytesPublished.Add(payloadSize)
p.metrics.BytesPending.Sub(payloadSize)
}
if sr.callback != nil || len(p.options.Interceptors) > 0 {
msgID := newMessageID(
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
int32(idx),
p.partitionIdx,
batchSize,
)
if sr.totalChunks > 1 {
if sr.chunkID == 0 {
sr.chunkRecorder.setFirstChunkID(
&messageID{
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
-1,
p.partitionIdx,
0,
})
} else if sr.chunkID == sr.totalChunks-1 {
sr.chunkRecorder.setLastChunkID(
&messageID{
int64(response.MessageId.GetLedgerId()),
int64(response.MessageId.GetEntryId()),
-1,
p.partitionIdx,
0,
})
// use chunkMsgID to set msgID
msgID = &sr.chunkRecorder.chunkedMsgID
}
}
if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
runCallback(sr.callback, msgID, sr.msg, nil)
p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
}
}
if sr.transaction != nil {
sr.transaction.endSendOrAckOp(nil)
}
}
// Mark this pending item as done
pi.Complete()
}
}
System configuration
Pulsar version: x.y
@RobertIndie @shibd
Hi @gunli, thanks for your careful review. After a simple check, I think memory limit do have some problems.
if we use schema, the msg.Payload should be nil and we request 0 memory at the beginning
In the #955 , uncompressedPayloadSize
is calculated in advance, so it is not accurate.
pulsar-client-go/pulsar/producer_partition.go
Lines 478 to 481 in be35740
The exact size
should be uncompressedSize
.
pulsar-client-go/pulsar/producer_partition.go
Line 543 in be35740
if we want to release the memory of the current failed chunk and the later ones, I think we should p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(lhs)).
I think so. But do we need to release mem when canAddToQueue
has failed? I think just return will be fine. There should be more test code to verify producer memory limit in chunking.
Anyway, we need to add more test code for producer memory limit to cover chunking
and schema
. If you have time, we can fix these problems together.