apache/pulsar-client-go

failTimeoutMessages() cannot delete outdated messages

geniusjoe opened this issue · 5 comments

Expected behavior

The failTimeoutMessages() timer in pulsar/producer_partition.go should periodically delete outdated messages in p.pendingQueue
which are older than p.options.SendTimeout.

Actual behavior

Messages in p.pendingQueue which are older than p.options.SendTimeout cannot be deleted. If one message cannot send successfully all the time, it will remain in the pendingQueue forever and lead to reconnecting fail infinitely.

Helpful information

I think this bug may be related to bugfix #551.
Bugfix #551 is aimed to solve a race condition between grabCnx() and failTimeoutMessages() functions. Producer may encounter this race condition when failTimeoutMessages() first delete outdated messages, then grabCnx() reconnect success and resend these pending deleted messages.
#551 solution is to refresh all p.pendingQueue messages sendAt field when grabCnx() reconnect happened, so that failTimeoutMessages() will not take effect to these messages. Code reference below:

for i := 0; i < viewSize; i++ {
	item := p.pendingQueue.Poll()
	if item == nil {
		continue
	}
	pi := item.(*pendingItem)
	// when resending pending batches, we update the sendAt timestamp and put to the back of queue
	// to avoid pending item been removed by failTimeoutMessages and cause race condition
	pi.Lock()
	pi.sentAt = time.Now()
	pi.Unlock()
	p.pendingQueue.Put(pi)
	p._getConn().WriteData(pi.buffer)
	if pi == lastViewItem {
		break
	}
}	

When compared with Java client, Java code may not encounter this race condition. Java failPendingMessages() will first check if current channel connection is close, if current connection is not close, then it will trigger cnx.ctx().channel().eventLoop() to avoid race condition. Code reference below:

// If we have a connection, we schedule the callback and recycle on the event loop thread to avoid any
// race condition since we also write the message on the socket from this thread
cnx.ctx().channel().eventLoop().execute(() -> {
    synchronized (ProducerImpl.this) {
        failPendingMessages(null, ex);
    }
});

I think we can add Lock() and Unlock() as member method in p.pendingQueue and remove lock operation in all of implementation methods such as Put() or Poll(). We should regard "iterate every member and delete some of them" or "iterate every member and change some member value" as an atomic operation. We should call p.pendingQueue.Lock() or p.pendingQueue.Unlock() whenever we take some action to the p.pendingQueue.

Steps to reproduce

  1. Update broker handleSend() function, always return a transient error when encountered some specific message.
@Override
protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
    ...
    this.ctx().channel().eventLoop().execute(() -> {
	this.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.UnknownError, "UnknownError"));
	this.completedSendOperation(producer.isNonPersistentTopic(), 0);
    });
}
  1. Write a demo to send this specific message and this demo will reconnect infinitely.
client, err := pulsar.NewClient(pulsar.ClientOptions{
	URL:            "<url>:6650",
	Authentication: pulsar.NewAuthenticationToken("<token>"),
})
if err != nil {
	log.Fatal(err)
}
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
	Topic:       "<tenant>/<namespace>/<topic>",
})
if err != nil {
	log.Fatal(err)
}
defer producer.Close()
ctx := context.Background()
if msgId, err := producer.Send(
	ctx,
	&pulsar.ProducerMessage{
		Payload:      []byte("specificMessage"),
	},
); err != nil {
	log.Fatal(err)
} else {
	log.Println("Published message: ", msgId)
}

Log will something like below and I use an UnknownError as a transient error

INFO[0000] Reconnected producer to broker                cnx="<local_addr>:37696 -> <pulsar_addr>:10003" producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
WARN[0000] Received send error from server: [UnknownError] : [UnknownError]  local_addr="<local_addr>:37696" remote_addr="pulsar://<pulsar_addr>:10003"
WARN[0000] Connection was closed                         cnx="<local_addr>:37696 -> <pulsar_addr>:10003" producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
WARN[0000] Connection was closed                         cnx="<local_addr>:37696 -> <pulsar_addr>:10003" producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] runEventsLoop will reconnect in producer      producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] Reconnecting to broker                        assignedBrokerURL= delayReconnectTime=115.401948ms producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] runEventsLoop will reconnect in producer      producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Reconnecting to broker                        assignedBrokerURL= delayReconnectTime=103.170835ms producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Connecting to broker                          remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] TCP connection established                    local_addr="<local_addr>:37708" remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] Connection is ready                           local_addr="<local_addr>:37708" remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] Connected producer                            cnx="<local_addr>:37708 -> <pulsar_addr>:10003" epoch=3 producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] Reconnected producer to broker                cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] Connected producer                            cnx="<local_addr>:37708 -> <pulsar_addr>:10003" epoch=3 producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Resending 1 pending batches                   producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Reconnected producer to broker                cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
WARN[0000] Received send error from server: [UnknownError] : [UnknownError]  local_addr="<local_addr>:37708" remote_addr="pulsar://<pulsar_addr>:10003"
WARN[0000] Connection was closed                         cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
WARN[0000] Connection was closed                         cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] runEventsLoop will reconnect in producer      producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] Reconnecting to broker                        assignedBrokerURL= delayReconnectTime=112.652459ms producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] runEventsLoop will reconnect in producer      producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Reconnecting to broker                        assignedBrokerURL= delayReconnectTime=104.869345ms producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Connecting to broker                          remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] TCP connection established                    local_addr="<local_addr>:37712" remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] Connection is ready                           local_addr="<local_addr>:37712" remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] Connected producer                            cnx="<local_addr>:37712 -> <pulsar_addr>:10003" epoch=4 producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Resending 1 pending batches                   producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Reconnected producer to broker                cnx="<local_addr>:37712 -> <pulsar_addr>:10003" producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
WARN[0000] Received send error from server: [UnknownError] : [UnknownError]  local_addr="<local_addr>:37712" remote_addr="pulsar://<pulsar_addr>:10003"
...

System configuration

Client version: 0.12.1
Broker version: 2.9

There seems to be a concurrency issue here. I will work on this.

@geniusjoe Could you review #1247?

@geniusjoe Could you review #1247?

@nodece Sure. As I mentioned in #1247 review, this bugfix definitely can remove outdated messages, but can this pr solve #551 concurrency issue?

@geniusjoe Could you review #1247?

@nodece Sure. As I mentioned in #1247 review, this bugfix definitely can remove outdated messages, but can this pr solve #551 concurrency issue?

Don't worry about failPendingMessages, it is thread-safe.

@geniusjoe Could you review #1247?

@nodece Sure. As I mentioned in #1247 review, this bugfix definitely can remove outdated messages, but can this pr solve #551 concurrency issue?

Don't worry about failPendingMessages, it is thread-safe.

Okay. I think this pr is what I need.