Prefetched messages from revoked partition not dropped on Rebalance
jakobmerrild opened this issue · 3 comments
It seems that in the following scenario a consumer can be handling messages for a partition they are not assigned to.
- Single consumer is assigned to all partitions (0-2)
- Due to high load an auto scaling system introduces a second consumer
- Rebalance happens; revoking partition 2 from the first consumer and assigning it to the new consumer
- The first consumer continues to handle messages from partition 2 (presumably due to prefetching)
It's a little difficult for me to write an exact test of the desired behavior, but here's a minimum example based on your test code. Uses printlns 😊 (modified from existing test)
it("should handle rebalance") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
val produced1 = (0 until 100).map(n => s"key-$n" -> s"value->$n")
val produced2 = (100 until 200).map(n => s"key-$n" -> s"value->$n")
val producedTotal = produced1.size.toLong + produced2.size.toLong
def startConsumer(
consumedQueue: Queue[IO, CommittableConsumerRecord[IO, String, String]],
stopSignal: SignallingRef[IO, Boolean],
i: Int
): IO[FiberIO[Unit]] =
{
KafkaConsumer
.stream(consumerSettings[IO].withMaxPrefetchBatches(0))
.subscribeTo(topic)
.flatMap(_.stream)
.evalTap(consumedQueue.offer)
.evalTap(commitable => IO(println(s"$i handled ${commitable.record.key} from part ${commitable.record.partition}")))
.evalMap(commitable => IO.sleep(100.millis).as(commitable.offset)) // Simulate work being done
.through(commitBatchWithin(100, 1.second)) // Commit so any new consumers don't start from the beginning of their partition
.interruptWhen(stopSignal)
.compile
.drain
}
.start
(for {
stopSignal <- SignallingRef[IO, Boolean](false)
queue <- Queue.unbounded[IO, CommittableConsumerRecord[IO, String, String]]
ref <- Ref.of[IO, Map[String, Int]](Map.empty)
_ <- IO(publishToKafka(topic, produced1))
_ <- startConsumer(queue, stopSignal, 0) // Start the first consumer
_ <- IO.sleep(5.seconds) // simulate scale out event after a while
_ <- IO(println("Scaled out..."))
_ <- startConsumer(queue, stopSignal, 1) // Start the second consumer
_ <- IO(publishToKafka(topic, produced2)) // Produce some more stuff
_ <- Stream
.fromQueueUnterminated(queue)
.evalMap { committable =>
// Construct map of how many times each message was consumed
ref.modify { counts =>
val key = committable.record.key
val newCounts = counts.updated(key, counts.getOrElse(key, 0) + 1)
(newCounts, newCounts)
}
}
.takeWhile(_.size < producedTotal)
.compile
.drain
.guarantee(stopSignal.set(true))
keys <- ref.get
} yield {
assert {
keys.size.toLong == producedTotal && {
keys == (0 until producedTotal.toInt).map { n =>
s"key-$n" -> 1 // Each message should have been handled once if first consumer doesn't keep polling old partition
}.toMap
}
}
}).unsafeRunSync()
}
}
Truncated output:
0 handled key-0 from part 1
...
0 handled key-2 from part 2
Scaled out...
0 handled key-3 from part 2
...
1 handled key-39 from part 2 <-- partition 2 assigned to new consumer
0 handled key-50 from part 2
1 handled key-41 from part 2
0 handled key-58 from part 2
1 handled key-46 from part 2
0 handled key-63 from part 2
1 handled key-48 from part 2
0 handled key-66 from part 2
1 handled key-49 from part 2
0 handled key-67 from part 2
1 handled key-50 from part 2
0 handled key-70 from part 2
1 handled key-58 from part 2
0 handled key-71 from part 2
1 handled key-63 from part 2
0 handled key-73 from part 2
1 handled key-66 from part 2
0 handled key-76 from part 2
1 handled key-67 from part 2
0 handled key-77 from part 2
1 handled key-70 from part 2
0 handled key-78 from part 2
1 handled key-71 from part 2
0 handled key-79 from part 2
1 handled key-73 from part 2
0 handled key-85 from part 2
1 handled key-76 from part 2
0 handled key-92 from part 2
1 handled key-77 from part 2
0 handled key-95 from part 2
1 handled key-78 from part 2
0 handled key-98 from part 2
1 handled key-79 from part 2
0 handled key-1 from part 0
1 handled key-85 from part 2
0 handled key-4 from part 0
1 handled key-92 from part 2
0 handled key-9 from part 0
1 handled key-95 from part 2
0 handled key-15 from part 0
1 handled key-98 from part 2
0 handled key-17 from part 0
1 handled key-101 from part 2
0 handled key-19 from part 0
1 handled key-112 from part 2
0 handled key-21 from part 0
1 handled key-113 from part 2
0 handled key-26 from part 0
1 handled key-116 from part 2
0 handled key-28 from part 0
1 handled key-118 from part 2
0 handled key-37 from part 0
1 handled key-121 from part 2
0 handled key-42 from part 0
1 handled key-125 from part 2
0 handled key-43 from part 0
1 handled key-128 from part 2
0 handled key-53 from part 0
1 handled key-129 from part 2
0 handled key-56 from part 0
1 handled key-133 from part 2
0 handled key-59 from part 0
1 handled key-134 from part 2
0 handled key-60 from part 0
1 handled key-140 from part 2
0 handled key-62 from part 0
1 handled key-141 from part 2
0 handled key-65 from part 0
1 handled key-142 from part 2
0 handled key-68 from part 0
1 handled key-148 from part 2
0 handled key-69 from part 0
1 handled key-150 from part 2
0 handled key-81 from part 0
1 handled key-159 from part 2
0 handled key-83 from part 0
1 handled key-162 from part 2
0 handled key-88 from part 0
1 handled key-166 from part 2
0 handled key-89 from part 0
1 handled key-175 from part 2
0 handled key-90 from part 0
1 handled key-177 from part 2
0 handled key-96 from part 0
1 handled key-178 from part 2
0 handled key-100 from part 1
1 handled key-182 from part 2
0 handled key-102 from part 1
1 handled key-184 from part 2
0 handled key-104 from part 1
1 handled key-186 from part 2
0 handled key-105 from part 1
1 handled key-188 from part 2
0 handled key-106 from part 1
1 handled key-189 from part 2
0 handled key-107 from part 1
1 handled key-190 from part 2
0 handled key-108 from part 1
1 handled key-192 from part 2
0 handled key-110 from part 1
1 handled key-199 from part 2
... partition 2 is empty at this point
0 handled key-115 from part 1
...
0 handled key-103 from part 0
...
Desired behavior:
Any messages from a revoked partition should be dropped unless they have already been pulled by downstream.
Here's a proper test of the desired behavior ( I think)
it("should handle rebalance") {
withTopic { topic =>
createCustomTopic(topic, partitions = 3)
val produced1 = (0 until 100).map(n => s"key-$n" -> s"value->$n")
val produced2 = (100 until 200).map(n => s"key-$n" -> s"value->$n")
val producedTotal = produced1.size.toLong + produced2.size.toLong
val commitBatchSize = 10
def startConsumer(
consumedQueue: Queue[IO, CommittableConsumerRecord[IO, String, String]],
stopSignal: SignallingRef[IO, Boolean]
): IO[FiberIO[Unit]] =
{
KafkaConsumer
.stream(consumerSettings[IO].withMaxPrefetchBatches(0))
.subscribeTo(topic)
.flatMap(_.stream)
.evalTap(consumedQueue.offer)
.evalMap(commitable => IO.sleep(100.millis).as(commitable.offset)) // Simulate work being done
.through(commitBatchWithin(commitBatchSize, 1.hour)) // Use unreasonably high time to ensure count determines commits
.interruptWhen(stopSignal)
.compile
.drain
}
.start
(for {
stopSignal <- SignallingRef[IO, Boolean](false)
queue <- Queue.unbounded[IO, CommittableConsumerRecord[IO, String, String]]
ref <- Ref.of[IO, Map[String, Int]](Map.empty)
_ <- IO(publishToKafka(topic, produced1))
_ <- startConsumer(queue, stopSignal) // Start the first consumer
_ <- IO.sleep(5.seconds) // simulate scale out event after a while
_ <- startConsumer(queue, stopSignal) // Start the second consumer
_ <- IO(publishToKafka(topic, produced2)) // Produce some more stuff
_ <- Stream
.fromQueueUnterminated(queue)
.evalMap { committable =>
// Construct map of how many times each message was consumed
ref.modify { counts =>
val key = committable.record.key
val newCounts = counts.updated(key, counts.getOrElse(key, 0) + 1)
(newCounts, newCounts)
}
}
.takeWhile(_.size < producedTotal)
.compile
.drain
.guarantee(stopSignal.set(true))
keys <- ref.get
} yield {
assert {
keys.size.toLong == producedTotal && {
// Number of messages handled more than once shouldn't be higher than the commit threshold
keys.values.filterNot(_ == 1).size < commitBatchSize
}
}
}).unsafeRunSync()
}
}
Note that I've been using .stream
to consume as that's what we are using in our code base. It might be more prudent/easier to define the desired behavior with partitionedMapStream
This issue should be related to this one: #127