apache/pulsar-client-go

[Bug][Producer] Inaccurate producer memory limit issue in chunking and schema

gunli opened this issue · 1 comments

gunli commented

Expected behavior

  1. Get the right uncompressedPayloadSize from msg.Payload or msg.Schema;
  2. Release the memory of current failed chunk:
// the permit of first chunk has acquired
if chunkID != 0 && !p.canAddToQueue(nsr, 0) {
    p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(lhs))
    return
}

Actual behavior

  1. if we use schema, the msg.Payload should be nil and we request 0 memory at the beginning partitionProducer.failTimeoutMessages()(pulsar/producer_partition.go:480/492):
        uncompressedPayload := msg.Payload//HOW ABOUT WE USE SCHEMA ???
	uncompressedPayloadSize := int64(len(uncompressedPayload))

	var schemaPayload []byte
	var err error
	if msg.Value != nil && msg.Payload != nil {
		p.log.Error("Can not set Value and Payload both")
		runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both"))
		return
	}

	// The block chan must be closed when returned with exception
	defer request.stopBlock()
	if !p.canAddToQueue(request, uncompressedPayloadSize) {
		return
	}
  1. in partitionProducer.internalSend()(pulsar/producer_partition.go:663), when chunking is enable, we send each chunk one by one, when a chunk canAddToQueue failed, we release the memory that have not been sent. When we look at the code below, it seems forget to release the memory of current failed chunk, 'cause lhs and rhs have been updated to lhs = chunkID * payloadChunkSize and rhs = lhs + payloadChunkSize, if we want to release the memory of the current failed chunk and the later ones, I think we should p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(lhs)). The java client is client.getMemoryLimitController().releaseMemory(uncompressedSize - readStartIndex);
|-----------------------uncompressedPayloadSize-----------------------| 
|----------------compressedPayload----------------|
|--prev chunks--|--cur chunk--|---------------------------------------| 
|--prev chunks--|-------------|-----(uncompressedPayloadSize-rhs)-----|
|--prev chunks--|-------------(uncompressedPayloadSize-lhs)-----------|
// the permit of first chunk has acquired
if chunkID != 0 && !p.canAddToQueue(nsr, 0) {
    p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(rhs))//SHOULD BE lhs ???
    return
}

And, if msg.Payload is nil, uncompressedPayloadSize should be 0, and we will release a negative amount of memory, the memLimit will be in a wrong state some time.

  1. when a chunk failed, we release the memory haven't been sent, and the original meesage will be failed finally, but when in partitionProducer.failTimeoutMessages()(pulsar/producer_partition.go:978) we release the memory of the size of msg.Payload, it seems we release too much here, 'cause the memory of the failed chunks has been release before.
size := len(sr.msg.Payload)//HOW ABOUT WE USE SCHEMA ???
p.releaseSemaphoreAndMem(int64(size))
  1. In partitionProducer.ReceivedSendReceipt() we release memory by p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload))), if a message is chunked, there will many sendRequest point to that message, it seems we will release more than we reqeusted.
for idx, i := range pi.sendRequests {
			sr := i.(*sendRequest)
			if sr.msg != nil {
				atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
				p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
				p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
				p.metrics.MessagesPublished.Inc()
				p.metrics.MessagesPending.Dec()
				payloadSize := float64(len(sr.msg.Payload))
				p.metrics.BytesPublished.Add(payloadSize)
				p.metrics.BytesPending.Sub(payloadSize)
			}
}

Steps to reproduce

How can we reproduce the issue
Code review

func (p *partitionProducer) internalSend(request *sendRequest) {
	p.log.Debug("Received send request: ", *request.msg)

	msg := request.msg

	// read payload from message
	uncompressedPayload := msg.Payload
	uncompressedPayloadSize := int64(len(uncompressedPayload))

	var schemaPayload []byte
	var err error
	if msg.Value != nil && msg.Payload != nil {
		p.log.Error("Can not set Value and Payload both")
		runCallback(request.callback, nil, request.msg, errors.New("can not set Value and Payload both"))
		return
	}

	// The block chan must be closed when returned with exception
	defer request.stopBlock()
	if !p.canAddToQueue(request, uncompressedPayloadSize) {
		return
	}

	if p.options.DisableMultiSchema {
		if msg.Schema != nil && p.options.Schema != nil &&
			msg.Schema.GetSchemaInfo().hash() != p.options.Schema.GetSchemaInfo().hash() {
			p.releaseSemaphoreAndMem(uncompressedPayloadSize)
			runCallback(request.callback, nil, request.msg, fmt.Errorf("msg schema can not match with producer schema"))
			p.log.WithError(err).Errorf("The producer %s of the topic %s is disabled the `MultiSchema`", p.producerName, p.topic)
			return
		}
	}
	var schema Schema
	var schemaVersion []byte
	if msg.Schema != nil {
		schema = msg.Schema
	} else if p.options.Schema != nil {
		schema = p.options.Schema
	}
	if msg.Value != nil {
		// payload and schema are mutually exclusive
		// try to get payload from schema value only if payload is not set
		if uncompressedPayload == nil && schema != nil {
			schemaPayload, err = schema.Encode(msg.Value)
			if err != nil {
				p.releaseSemaphoreAndMem(uncompressedPayloadSize)
				runCallback(request.callback, nil, request.msg, newError(SchemaFailure, err.Error()))
				p.log.WithError(err).Errorf("Schema encode message failed %s", msg.Value)
				return
			}
		}
	}
	if uncompressedPayload == nil {
		uncompressedPayload = schemaPayload
	}

	if schema != nil {
		schemaVersion = p.schemaCache.Get(schema.GetSchemaInfo())
		if schemaVersion == nil {
			schemaVersion, err = p.getOrCreateSchema(schema.GetSchemaInfo())
			if err != nil {
				p.releaseSemaphoreAndMem(uncompressedPayloadSize)
				p.log.WithError(err).Error("get schema version fail")
				runCallback(request.callback, nil, request.msg, fmt.Errorf("get schema version fail, err: %w", err))
				return
			}
			p.schemaCache.Put(schema.GetSchemaInfo(), schemaVersion)
		}
	}

	uncompressedSize := len(uncompressedPayload)

	deliverAt := msg.DeliverAt
	if msg.DeliverAfter.Nanoseconds() > 0 {
		deliverAt = time.Now().Add(msg.DeliverAfter)
	}

	mm := p.genMetadata(msg, uncompressedSize, deliverAt)

	// set default ReplicationClusters when DisableReplication
	if msg.DisableReplication {
		msg.ReplicationClusters = []string{"__local__"}
	}

	sendAsBatch := !p.options.DisableBatching &&
		msg.ReplicationClusters == nil &&
		deliverAt.UnixNano() < 0

	// Once the batching is enabled, it can close blockCh early to make block finish
	if sendAsBatch {
		request.stopBlock()
	} else {
		// update sequence id for metadata, make the size of msgMetadata more accurate
		// batch sending will update sequence ID in the BatchBuilder
		p.updateMetadataSeqID(mm, msg)
	}

	maxMessageSize := int(p._getConn().GetMaxMessageSize())

	// compress payload if not batching
	var compressedPayload []byte
	var compressedSize int
	var checkSize int
	if !sendAsBatch {
		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
		compressedSize = len(compressedPayload)
		checkSize = compressedSize

		// set the compress type in msgMetaData
		compressionType := pb.CompressionType(p.options.CompressionType)
		if compressionType != pb.CompressionType_NONE {
			mm.Compression = &compressionType
		}
	} else {
		// final check for batching message is in serializeMessage
		// this is a double check
		checkSize = uncompressedSize
	}

	// if msg is too large and chunking is disabled
	if checkSize > maxMessageSize && !p.options.EnableChunking {
		p.releaseSemaphoreAndMem(uncompressedPayloadSize)
		runCallback(request.callback, nil, request.msg, errMessageTooLarge)
		p.log.WithError(errMessageTooLarge).
			WithField("size", checkSize).
			WithField("properties", msg.Properties).
			Errorf("MaxMessageSize %d", maxMessageSize)
		p.metrics.PublishErrorsMsgTooLarge.Inc()
		return
	}

	var totalChunks int
	// max chunk payload size
	var payloadChunkSize int
	if sendAsBatch || !p.options.EnableChunking {
		totalChunks = 1
		payloadChunkSize = int(p._getConn().GetMaxMessageSize())
	} else {
		payloadChunkSize = int(p._getConn().GetMaxMessageSize()) - proto.Size(mm)
		if payloadChunkSize <= 0 {
			p.releaseSemaphoreAndMem(uncompressedPayloadSize)
			runCallback(request.callback, nil, msg, errMetaTooLarge)
			p.log.WithError(errMetaTooLarge).
				WithField("metadata size", proto.Size(mm)).
				WithField("properties", msg.Properties).
				Errorf("MaxMessageSize %d", int(p._getConn().GetMaxMessageSize()))
			p.metrics.PublishErrorsMsgTooLarge.Inc()
			return
		}
		// set ChunkMaxMessageSize
		if p.options.ChunkMaxMessageSize != 0 {
			payloadChunkSize = int(math.Min(float64(payloadChunkSize), float64(p.options.ChunkMaxMessageSize)))
		}
		totalChunks = int(math.Max(1, math.Ceil(float64(compressedSize)/float64(payloadChunkSize))))
	}

	// set total chunks to send request
	request.totalChunks = totalChunks

	if !sendAsBatch {
		if totalChunks > 1 {
			var lhs, rhs int
			uuid := fmt.Sprintf("%s-%s", p.producerName, strconv.FormatUint(*mm.SequenceId, 10))
			mm.Uuid = proto.String(uuid)
			mm.NumChunksFromMsg = proto.Int32(int32(totalChunks))
			mm.TotalChunkMsgSize = proto.Int32(int32(compressedSize))
			cr := newChunkRecorder()
			for chunkID := 0; chunkID < totalChunks; chunkID++ {
				lhs = chunkID * payloadChunkSize
				if rhs = lhs + payloadChunkSize; rhs > compressedSize {
					rhs = compressedSize
				}
				// update chunk id
				mm.ChunkId = proto.Int32(int32(chunkID))
				nsr := &sendRequest{
					ctx:              request.ctx,
					msg:              request.msg,
					callback:         request.callback,
					callbackOnce:     request.callbackOnce,
					publishTime:      request.publishTime,
					blockCh:          request.blockCh,
					closeBlockChOnce: request.closeBlockChOnce,
					totalChunks:      totalChunks,
					chunkID:          chunkID,
					uuid:             uuid,
					chunkRecorder:    cr,
					transaction:      request.transaction,
				}
				// the permit of first chunk has acquired
				if chunkID != 0 && !p.canAddToQueue(nsr, 0) {
					p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(rhs))//BUG, SHOULD BE lhs
					return
				}
				p.internalSingleSend(mm, compressedPayload[lhs:rhs], nsr, uint32(maxMessageSize))
			}
			// close the blockCh when all the chunks acquired permits
			request.stopBlock()
		} else {
			// close the blockCh when totalChunks is 1 (it has acquired permits)
			request.stopBlock()
			p.internalSingleSend(mm, compressedPayload, request, uint32(maxMessageSize))
		}
	} else {
		smm := p.genSingleMessageMetadataInBatch(msg, uncompressedSize)
		multiSchemaEnabled := !p.options.DisableMultiSchema
		added := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
			multiSchemaEnabled)
		if !added {
			// The current batch is full. flush it and retry

			p.internalFlushCurrentBatch()

			// after flushing try again to add the current payload
			if ok := addRequestToBatch(smm, p, uncompressedPayload, request, msg, deliverAt, schemaVersion,
				multiSchemaEnabled); !ok {
				p.releaseSemaphoreAndMem(uncompressedPayloadSize)
				runCallback(request.callback, nil, request.msg, errFailAddToBatch)
				p.log.WithField("size", uncompressedSize).
					WithField("properties", msg.Properties).
					Error("unable to add message to batch")
				return
			}
		}
		if request.flushImmediately {

			p.internalFlushCurrentBatch()

		}
	}
}

func (p *partitionProducer) failTimeoutMessages() {
	diff := func(sentAt time.Time) time.Duration {
		return p.options.SendTimeout - time.Since(sentAt)
	}

	t := time.NewTimer(p.options.SendTimeout)
	defer t.Stop()

	for range t.C {
		state := p.getProducerState()
		if state == producerClosing || state == producerClosed {
			return
		}

		item := p.pendingQueue.Peek()
		if item == nil {
			// pending queue is empty
			t.Reset(p.options.SendTimeout)
			continue
		}
		oldestItem := item.(*pendingItem)
		if nextWaiting := diff(oldestItem.sentAt); nextWaiting > 0 {
			// none of these pending messages have timed out, wait and retry
			t.Reset(nextWaiting)
			continue
		}

		// since pending queue is not thread safe because of there is no global iteration lock
		// to control poll from pending queue, current goroutine and connection receipt handler
		// iterate pending queue at the same time, this maybe a performance trade-off
		// see https://github.com/apache/pulsar-client-go/pull/301
		curViewItems := p.pendingQueue.ReadableSlice()
		viewSize := len(curViewItems)
		if viewSize <= 0 {
			// double check
			t.Reset(p.options.SendTimeout)
			continue
		}
		p.log.Infof("Failing %d messages on timeout %s", viewSize, p.options.SendTimeout)
		lastViewItem := curViewItems[viewSize-1].(*pendingItem)

		// iterate at most viewSize items
		for i := 0; i < viewSize; i++ {
			tickerNeedWaiting := time.Duration(0)
			item := p.pendingQueue.CompareAndPoll(
				func(m interface{}) bool {
					if m == nil {
						return false
					}

					pi := m.(*pendingItem)
					pi.Lock()
					defer pi.Unlock()
					if nextWaiting := diff(pi.sentAt); nextWaiting > 0 {
						// current and subsequent items not timeout yet, stop iterating
						tickerNeedWaiting = nextWaiting
						return false
					}
					return true
				})

			if item == nil {
				t.Reset(p.options.SendTimeout)
				break
			}

			if tickerNeedWaiting > 0 {
				t.Reset(tickerNeedWaiting)
				break
			}

			pi := item.(*pendingItem)
			pi.Lock()

			for _, i := range pi.sendRequests {
				sr := i.(*sendRequest)
				if sr.msg != nil {
					size := len(sr.msg.Payload)
					p.releaseSemaphoreAndMem(int64(size))
					p.metrics.MessagesPending.Dec()
					p.metrics.BytesPending.Sub(float64(size))
					p.metrics.PublishErrorsTimeout.Inc()
					p.log.WithError(errSendTimeout).
						WithField("size", size).
						WithField("properties", sr.msg.Properties)
				}

				if sr.callback != nil {
					sr.callbackOnce.Do(func() {
						runCallback(sr.callback, nil, sr.msg, errSendTimeout)
					})
				}
				if sr.transaction != nil {
					sr.transaction.endSendOrAckOp(nil)
				}
			}

			// flag the sending has completed with error, flush make no effect
			pi.Complete()
			pi.Unlock()

			// finally reached the last view item, current iteration ends
			if pi == lastViewItem {
				t.Reset(p.options.SendTimeout)
				break
			}
		}
	}
}

func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt) {
	pi, ok := p.pendingQueue.Peek().(*pendingItem)

	if !ok {
		// if we receive a receipt although the pending queue is empty, the state of the broker and the producer differs.
		p.log.Warnf("Got ack %v for timed out msg", response.GetMessageId())
		return
	}

	if pi.sequenceID < response.GetSequenceId() {
		// Force connection closing so that messages can be re-transmitted in a new connection
		p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
			response.GetSequenceId(), pi.sequenceID)
		p._getConn().Close()
		return
	} else if pi.sequenceID > response.GetSequenceId() {
		// Ignoring the ack since it's referring to a message that has already timed out.
		p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
			response.GetSequenceId(), pi.sequenceID)
		return
	} else {
		// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback
		p.pendingQueue.Poll()

		now := time.Now().UnixNano()

		// lock the pending item while sending the requests
		pi.Lock()
		defer pi.Unlock()
		p.metrics.PublishRPCLatency.Observe(float64(now-pi.sentAt.UnixNano()) / 1.0e9)
		batchSize := int32(0)
		for _, i := range pi.sendRequests {
			sr := i.(*sendRequest)
			if sr.msg != nil {
				batchSize = batchSize + 1
			} else { // Flush request
				break
			}
		}
		for idx, i := range pi.sendRequests {
			sr := i.(*sendRequest)
			if sr.msg != nil {
				atomic.StoreInt64(&p.lastSequenceID, int64(pi.sequenceID))
				p.releaseSemaphoreAndMem(int64(len(sr.msg.Payload)))
				p.metrics.PublishLatency.Observe(float64(now-sr.publishTime.UnixNano()) / 1.0e9)
				p.metrics.MessagesPublished.Inc()
				p.metrics.MessagesPending.Dec()
				payloadSize := float64(len(sr.msg.Payload))
				p.metrics.BytesPublished.Add(payloadSize)
				p.metrics.BytesPending.Sub(payloadSize)
			}

			if sr.callback != nil || len(p.options.Interceptors) > 0 {
				msgID := newMessageID(
					int64(response.MessageId.GetLedgerId()),
					int64(response.MessageId.GetEntryId()),
					int32(idx),
					p.partitionIdx,
					batchSize,
				)

				if sr.totalChunks > 1 {
					if sr.chunkID == 0 {
						sr.chunkRecorder.setFirstChunkID(
							&messageID{
								int64(response.MessageId.GetLedgerId()),
								int64(response.MessageId.GetEntryId()),
								-1,
								p.partitionIdx,
								0,
							})
					} else if sr.chunkID == sr.totalChunks-1 {
						sr.chunkRecorder.setLastChunkID(
							&messageID{
								int64(response.MessageId.GetLedgerId()),
								int64(response.MessageId.GetEntryId()),
								-1,
								p.partitionIdx,
								0,
							})
						// use chunkMsgID to set msgID
						msgID = &sr.chunkRecorder.chunkedMsgID
					}
				}

				if sr.totalChunks <= 1 || sr.chunkID == sr.totalChunks-1 {
					runCallback(sr.callback, msgID, sr.msg, nil)
					p.options.Interceptors.OnSendAcknowledgement(p, sr.msg, msgID)
				}
			}
			if sr.transaction != nil {
				sr.transaction.endSendOrAckOp(nil)
			}
		}

		// Mark this pending item as done
		pi.Complete()
	}
}

System configuration

Pulsar version: x.y
@RobertIndie @shibd

Hi @gunli, thanks for your careful review. After a simple check, I think memory limit do have some problems.

if we use schema, the msg.Payload should be nil and we request 0 memory at the beginning

In the #955 , uncompressedPayloadSize is calculated in advance, so it is not accurate.

// read payload from message
uncompressedPayload := msg.Payload
uncompressedPayloadSize := int64(len(uncompressedPayload))

The exact size should be uncompressedSize.

uncompressedSize := len(uncompressedPayload)

if we want to release the memory of the current failed chunk and the later ones, I think we should p.releaseSemaphoreAndMem(uncompressedPayloadSize - int64(lhs)).

I think so. But do we need to release mem when canAddToQueue has failed? I think just return will be fine. There should be more test code to verify producer memory limit in chunking.

Anyway, we need to add more test code for producer memory limit to cover chunking and schema. If you have time, we can fix these problems together.