fd4s/fs2-kafka

Prefetched messages from revoked partition not dropped on Rebalance

jakobmerrild opened this issue · 3 comments

It seems that in the following scenario a consumer can be handling messages for a partition they are not assigned to.

  1. Single consumer is assigned to all partitions (0-2)
  2. Due to high load an auto scaling system introduces a second consumer
  3. Rebalance happens; revoking partition 2 from the first consumer and assigning it to the new consumer
  4. The first consumer continues to handle messages from partition 2 (presumably due to prefetching)

It's a little difficult for me to write an exact test of the desired behavior, but here's a minimum example based on your test code. Uses printlns 😊 (modified from existing test)

     it("should handle rebalance") {
      withTopic { topic =>
        createCustomTopic(topic, partitions = 3)
        val produced1 = (0 until 100).map(n => s"key-$n" -> s"value->$n")
        val produced2 = (100 until 200).map(n => s"key-$n" -> s"value->$n")
        val producedTotal = produced1.size.toLong + produced2.size.toLong

        def startConsumer(
                           consumedQueue: Queue[IO, CommittableConsumerRecord[IO, String, String]],
                           stopSignal: SignallingRef[IO, Boolean],
                           i: Int
                         ): IO[FiberIO[Unit]] =
            {
              KafkaConsumer
                .stream(consumerSettings[IO].withMaxPrefetchBatches(0))
                .subscribeTo(topic)
                .flatMap(_.stream)
                .evalTap(consumedQueue.offer)
                .evalTap(commitable => IO(println(s"$i handled ${commitable.record.key} from part ${commitable.record.partition}")))
                .evalMap(commitable => IO.sleep(100.millis).as(commitable.offset)) // Simulate work being done
                .through(commitBatchWithin(100, 1.second)) // Commit so any new consumers don't start from the beginning of their partition
                .interruptWhen(stopSignal)
                .compile
                .drain
            }
            .start

        (for {
          stopSignal <- SignallingRef[IO, Boolean](false)
          queue <- Queue.unbounded[IO, CommittableConsumerRecord[IO, String, String]]
          ref <- Ref.of[IO, Map[String, Int]](Map.empty)
          _ <- IO(publishToKafka(topic, produced1))
          _ <- startConsumer(queue, stopSignal, 0) // Start the first consumer
          _ <- IO.sleep(5.seconds) // simulate scale out event after a while
          _ <- IO(println("Scaled out..."))
          _ <- startConsumer(queue, stopSignal, 1) // Start the second consumer
          _ <- IO(publishToKafka(topic, produced2)) // Produce some more stuff
          _ <- Stream
            .fromQueueUnterminated(queue)
            .evalMap { committable =>
              // Construct map of how many times each message was consumed
              ref.modify { counts =>
                val key = committable.record.key
                val newCounts = counts.updated(key, counts.getOrElse(key, 0) + 1)
                (newCounts, newCounts)
              }
            }
            .takeWhile(_.size < producedTotal)
            .compile
            .drain
            .guarantee(stopSignal.set(true))
          keys <- ref.get
        } yield {
          assert {
            keys.size.toLong == producedTotal && {
              keys == (0 until producedTotal.toInt).map { n =>
                s"key-$n" -> 1 // Each message should have been handled once if first consumer doesn't keep polling old partition
              }.toMap
            }
          }
        }).unsafeRunSync()
      }
    }

Truncated output:

0 handled key-0 from part 1
...
0 handled key-2 from part 2
Scaled out...
0 handled key-3 from part 2
...
1 handled key-39 from part 2 <-- partition 2 assigned to new consumer
0 handled key-50 from part 2
1 handled key-41 from part 2
0 handled key-58 from part 2
1 handled key-46 from part 2
0 handled key-63 from part 2
1 handled key-48 from part 2
0 handled key-66 from part 2
1 handled key-49 from part 2
0 handled key-67 from part 2
1 handled key-50 from part 2
0 handled key-70 from part 2
1 handled key-58 from part 2
0 handled key-71 from part 2
1 handled key-63 from part 2
0 handled key-73 from part 2
1 handled key-66 from part 2
0 handled key-76 from part 2
1 handled key-67 from part 2
0 handled key-77 from part 2
1 handled key-70 from part 2
0 handled key-78 from part 2
1 handled key-71 from part 2
0 handled key-79 from part 2
1 handled key-73 from part 2
0 handled key-85 from part 2
1 handled key-76 from part 2
0 handled key-92 from part 2
1 handled key-77 from part 2
0 handled key-95 from part 2
1 handled key-78 from part 2
0 handled key-98 from part 2
1 handled key-79 from part 2
0 handled key-1 from part 0
1 handled key-85 from part 2
0 handled key-4 from part 0
1 handled key-92 from part 2
0 handled key-9 from part 0
1 handled key-95 from part 2
0 handled key-15 from part 0
1 handled key-98 from part 2
0 handled key-17 from part 0
1 handled key-101 from part 2
0 handled key-19 from part 0
1 handled key-112 from part 2
0 handled key-21 from part 0
1 handled key-113 from part 2
0 handled key-26 from part 0
1 handled key-116 from part 2
0 handled key-28 from part 0
1 handled key-118 from part 2
0 handled key-37 from part 0
1 handled key-121 from part 2
0 handled key-42 from part 0
1 handled key-125 from part 2
0 handled key-43 from part 0
1 handled key-128 from part 2
0 handled key-53 from part 0
1 handled key-129 from part 2
0 handled key-56 from part 0
1 handled key-133 from part 2
0 handled key-59 from part 0
1 handled key-134 from part 2
0 handled key-60 from part 0
1 handled key-140 from part 2
0 handled key-62 from part 0
1 handled key-141 from part 2
0 handled key-65 from part 0
1 handled key-142 from part 2
0 handled key-68 from part 0
1 handled key-148 from part 2
0 handled key-69 from part 0
1 handled key-150 from part 2
0 handled key-81 from part 0
1 handled key-159 from part 2
0 handled key-83 from part 0
1 handled key-162 from part 2
0 handled key-88 from part 0
1 handled key-166 from part 2
0 handled key-89 from part 0
1 handled key-175 from part 2
0 handled key-90 from part 0
1 handled key-177 from part 2
0 handled key-96 from part 0
1 handled key-178 from part 2
0 handled key-100 from part 1
1 handled key-182 from part 2
0 handled key-102 from part 1
1 handled key-184 from part 2
0 handled key-104 from part 1
1 handled key-186 from part 2
0 handled key-105 from part 1
1 handled key-188 from part 2
0 handled key-106 from part 1
1 handled key-189 from part 2
0 handled key-107 from part 1
1 handled key-190 from part 2
0 handled key-108 from part 1
1 handled key-192 from part 2
0 handled key-110 from part 1
1 handled key-199 from part 2
... partition 2 is empty at this point
0 handled key-115 from part 1
...
0 handled key-103 from part 0
...

Desired behavior:
Any messages from a revoked partition should be dropped unless they have already been pulled by downstream.

Here's a proper test of the desired behavior ( I think)

it("should handle rebalance") {
      withTopic { topic =>
        createCustomTopic(topic, partitions = 3)
        val produced1 = (0 until 100).map(n => s"key-$n" -> s"value->$n")
        val produced2 = (100 until 200).map(n => s"key-$n" -> s"value->$n")
        val producedTotal = produced1.size.toLong + produced2.size.toLong
        val commitBatchSize = 10

        def startConsumer(
                           consumedQueue: Queue[IO, CommittableConsumerRecord[IO, String, String]],
                           stopSignal: SignallingRef[IO, Boolean]
                         ): IO[FiberIO[Unit]] =
            {
              KafkaConsumer
                .stream(consumerSettings[IO].withMaxPrefetchBatches(0))
                .subscribeTo(topic)
                .flatMap(_.stream)
                .evalTap(consumedQueue.offer)
                .evalMap(commitable => IO.sleep(100.millis).as(commitable.offset)) // Simulate work being done
                .through(commitBatchWithin(commitBatchSize, 1.hour)) // Use unreasonably high time to ensure count determines commits
                .interruptWhen(stopSignal)
                .compile
                .drain
            }
            .start

        (for {
          stopSignal <- SignallingRef[IO, Boolean](false)
          queue <- Queue.unbounded[IO, CommittableConsumerRecord[IO, String, String]]
          ref <- Ref.of[IO, Map[String, Int]](Map.empty)
          _ <- IO(publishToKafka(topic, produced1))
          _ <- startConsumer(queue, stopSignal) // Start the first consumer
          _ <- IO.sleep(5.seconds) // simulate scale out event after a while
          _ <- startConsumer(queue, stopSignal) // Start the second consumer
          _ <- IO(publishToKafka(topic, produced2)) // Produce some more stuff
          _ <- Stream
            .fromQueueUnterminated(queue)
            .evalMap { committable =>
              // Construct map of how many times each message was consumed
              ref.modify { counts =>
                val key = committable.record.key
                val newCounts = counts.updated(key, counts.getOrElse(key, 0) + 1)
                (newCounts, newCounts)
              }
            }
            .takeWhile(_.size < producedTotal)
            .compile
            .drain
            .guarantee(stopSignal.set(true))
          keys <- ref.get
        } yield {
          assert {
            keys.size.toLong == producedTotal && {
              // Number of messages handled more than once shouldn't be higher than the commit threshold
              keys.values.filterNot(_ == 1).size < commitBatchSize
            }
          }
        }).unsafeRunSync()
      }
    }

Note that I've been using .stream to consume as that's what we are using in our code base. It might be more prudent/easier to define the desired behavior with partitionedMapStream

This issue should be related to this one: #127