failTimeoutMessages() cannot delete outdated messages
geniusjoe opened this issue · 5 comments
Expected behavior
The failTimeoutMessages()
timer in pulsar/producer_partition.go
should periodically delete outdated messages in p.pendingQueue
which are older than p.options.SendTimeout
.
Actual behavior
Messages in p.pendingQueue
which are older than p.options.SendTimeout
cannot be deleted. If one message cannot send successfully all the time, it will remain in the pendingQueue
forever and lead to reconnecting fail infinitely.
Helpful information
I think this bug may be related to bugfix #551.
Bugfix #551 is aimed to solve a race condition between grabCnx()
and failTimeoutMessages()
functions. Producer may encounter this race condition when failTimeoutMessages()
first delete outdated messages, then grabCnx()
reconnect success and resend these pending deleted messages.
#551 solution is to refresh all p.pendingQueue
messages sendAt
field when grabCnx()
reconnect happened, so that failTimeoutMessages()
will not take effect to these messages. Code reference below:
for i := 0; i < viewSize; i++ {
item := p.pendingQueue.Poll()
if item == nil {
continue
}
pi := item.(*pendingItem)
// when resending pending batches, we update the sendAt timestamp and put to the back of queue
// to avoid pending item been removed by failTimeoutMessages and cause race condition
pi.Lock()
pi.sentAt = time.Now()
pi.Unlock()
p.pendingQueue.Put(pi)
p._getConn().WriteData(pi.buffer)
if pi == lastViewItem {
break
}
}
When compared with Java client, Java code may not encounter this race condition. Java failPendingMessages()
will first check if current channel connection is close, if current connection is not close, then it will trigger cnx.ctx().channel().eventLoop()
to avoid race condition. Code reference below:
// If we have a connection, we schedule the callback and recycle on the event loop thread to avoid any
// race condition since we also write the message on the socket from this thread
cnx.ctx().channel().eventLoop().execute(() -> {
synchronized (ProducerImpl.this) {
failPendingMessages(null, ex);
}
});
I think we can add Lock()
and Unlock()
as member method in p.pendingQueue
and remove lock operation in all of implementation methods such as Put()
or Poll()
. We should regard "iterate every member and delete some of them" or "iterate every member and change some member value" as an atomic operation. We should call p.pendingQueue.Lock()
or p.pendingQueue.Unlock()
whenever we take some action to the p.pendingQueue
.
Steps to reproduce
- Update broker
handleSend()
function, always return a transient error when encountered some specific message.
@Override
protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
...
this.ctx().channel().eventLoop().execute(() -> {
this.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.UnknownError, "UnknownError"));
this.completedSendOperation(producer.isNonPersistentTopic(), 0);
});
}
- Write a demo to send this specific message and this demo will reconnect infinitely.
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: "<url>:6650",
Authentication: pulsar.NewAuthenticationToken("<token>"),
})
if err != nil {
log.Fatal(err)
}
defer client.Close()
producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "<tenant>/<namespace>/<topic>",
})
if err != nil {
log.Fatal(err)
}
defer producer.Close()
ctx := context.Background()
if msgId, err := producer.Send(
ctx,
&pulsar.ProducerMessage{
Payload: []byte("specificMessage"),
},
); err != nil {
log.Fatal(err)
} else {
log.Println("Published message: ", msgId)
}
Log will something like below and I use an UnknownError
as a transient error
INFO[0000] Reconnected producer to broker cnx="<local_addr>:37696 -> <pulsar_addr>:10003" producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
WARN[0000] Received send error from server: [UnknownError] : [UnknownError] local_addr="<local_addr>:37696" remote_addr="pulsar://<pulsar_addr>:10003"
WARN[0000] Connection was closed cnx="<local_addr>:37696 -> <pulsar_addr>:10003" producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
WARN[0000] Connection was closed cnx="<local_addr>:37696 -> <pulsar_addr>:10003" producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] runEventsLoop will reconnect in producer producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] Reconnecting to broker assignedBrokerURL= delayReconnectTime=115.401948ms producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] runEventsLoop will reconnect in producer producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Reconnecting to broker assignedBrokerURL= delayReconnectTime=103.170835ms producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Connecting to broker remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] TCP connection established local_addr="<local_addr>:37708" remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] Connection is ready local_addr="<local_addr>:37708" remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] Connected producer cnx="<local_addr>:37708 -> <pulsar_addr>:10003" epoch=3 producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] Reconnected producer to broker cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] Connected producer cnx="<local_addr>:37708 -> <pulsar_addr>:10003" epoch=3 producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Resending 1 pending batches producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Reconnected producer to broker cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
WARN[0000] Received send error from server: [UnknownError] : [UnknownError] local_addr="<local_addr>:37708" remote_addr="pulsar://<pulsar_addr>:10003"
WARN[0000] Connection was closed cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
WARN[0000] Connection was closed cnx="<local_addr>:37708 -> <pulsar_addr>:10003" producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] runEventsLoop will reconnect in producer producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] Reconnecting to broker assignedBrokerURL= delayReconnectTime=112.652459ms producerID=2 producer_name=<tenant>-2-58692 topic=<tenant>/<namespace>/<topic>-partition-0
INFO[0000] runEventsLoop will reconnect in producer producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Reconnecting to broker assignedBrokerURL= delayReconnectTime=104.869345ms producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Connecting to broker remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] TCP connection established local_addr="<local_addr>:37712" remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] Connection is ready local_addr="<local_addr>:37712" remote_addr="pulsar://<pulsar_addr>:10003"
INFO[0000] Connected producer cnx="<local_addr>:37712 -> <pulsar_addr>:10003" epoch=4 producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Resending 1 pending batches producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
INFO[0000] Reconnected producer to broker cnx="<local_addr>:37712 -> <pulsar_addr>:10003" producerID=1 producer_name=<tenant>-2-58691 topic=<tenant>/<namespace>/<topic>-partition-1
WARN[0000] Received send error from server: [UnknownError] : [UnknownError] local_addr="<local_addr>:37712" remote_addr="pulsar://<pulsar_addr>:10003"
...
System configuration
Client version: 0.12.1
Broker version: 2.9
There seems to be a concurrency issue here. I will work on this.
@geniusjoe Could you review #1247?
@geniusjoe Could you review #1247?
@nodece Sure. As I mentioned in #1247 review, this bugfix definitely can remove outdated messages, but can this pr solve #551 concurrency issue?