Leader change during re-balance may cause partition not being consumed
waterlx opened this issue · 4 comments
Simplified it into the following scenario:
3 brokers. 1 topic with 4 partitions. 2 consumer instances to consume that topic. The start index of broker, partition and consumer is 0.
When c0 (consumer instance 0) calls utils.go#dividePartitionsBetweenConsumers(), the leaders are like:
- p0 on b0
- p1 on b1
- p2 on b2
- p3 on b0
After sort(by leader then by partition id), partitions is like
{p0 b0 xxxx} // {partition leader address}
{p3 b0 xxxx}
{p1 b1 xxxx}
{p2 b2 xxxx}
So c0 gets its myPartitions (to claim) like p0, p3.
Then p0 somehow change its leader to b2. The leaders are like:
- p0 on b2
- p1 on b1
- p2 on b2
- p3 on b0
And another consumer instance c1 calls utils.go#dividePartitionsBetweenConsumers().
After sort(by leader then by partition id), partitions is like
{p3 b0 xxxx}
{p1 b1 xxxx}
{p0 b2 xxxx}
{p2 b2 xxxx}
c1 gets its myPartitions (to claim) like p0, p2.
As a result, we have a condition that c0 tries to claim p0 and p3 while c1 tries to claim p0 and p2.
- c0 and c1 both fight for p0 and c0 wins (as it tries to claim it firstly).
- But no one tries to claim p1.
In utils.go, we sort the partitionLeader by leader firstly.
func (pls partitionLeaders) Less(i, j int) bool {
return pls[i].leader < pls[j].leader || (pls[i].leader == pls[j].leader && pls[i].id < pls[j].id)
When leader changes between 2 calls of dividePartitionsBetweenConsumers(), the result after sort is changed. I believe the root cause it here.
I am not sure if Less() could be changed to drop the comparison of leader and compare partition.id only.
The sort on leader will have the following benefit, according to https://kafka.apache.org/documentation/#impl_consumerrebalance
we try to assign partitions to consumers in such a way that reduces the number of broker nodes each consumer has to connect to.
The drop of leader comparison will break that. But keep comparing leader will not let us avoid the condition described in the issue when there is a leader change
As a second thought, shall we trigger a re-balance when there is a leader change? Could be a solution for this issue?
If I get it correctly, Kafka Java client does not consider leader and sorts on partition in numeric order. See RangeAssignor in https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/consumer/PartitionAssignor.scala