fd4s/fs2-kafka

Rebalance of partitions with cooperative sticky assignor results in dead/unread partitions

rney-lookout opened this issue · 3 comments

This test was done with the 3.0.0-M5 build of the fs2-kafka client. Kafka topic has 8 partitions and 2 consumer processes with each getting assigned 4 partitions.

After successful startup an error terminated the stream for one of the processes resulting in a partition rebalance. Initial offering of partitions was 2 partitions as noted in the Kafka client log:

Updating assignment with
Assigned partitions: [andromeda.notifications.aoa.v3-5, andromeda.notifications.aoa.v3-2]
Current owned partitions: []
Added partitions (assigned - owned): [andromeda.notifications.aoa.v3-5, andromeda.notifications.aoa.v3-2]
Revoked partitions (owned - assigned): []
logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-80787","level":"INFO"}

Logging within the stream confirms the assignment

"message":"New partition assigned to consumer for Topic: andromeda.notifications.aoa.v3:2
"message":"New partition assigned to consumer for Topic: andromeda.notifications.aoa.v3:5"

About 1 minute later in the log another assignment message arrives from the Kafka client with the remaining 2 partitions

Updating assignment with
Assigned partitions: [andromeda.notifications.aoa.v3-5, andromeda.notifications.aoa.v3-6, andromeda.notifications.aoa.v3-2, andromeda.notifications.aoa.v3-3]
Current owned partitions: [andromeda.notifications.aoa.v3-5, andromeda.notifications.aoa.v3-2]
Added partitions (assigned - owned): [andromeda.notifications.aoa.v3-6, andromeda.notifications.aoa.v3-3]
Revoked partitions (owned - assigned): []
"logger_name":"org.apache.kafka.clients.consumer.internals.ConsumerCoordinator","thread_name":"fs2-kafka-consumer-80787","level":"INFO"}

The consumer application printed out no notifications

Code in the consumer application is

        KafkaConsumer
          .stream(kafkaConsumerSettings.consumerSettings(topicConfig.consumerGroupId))
          .subscribeTo(topicConfig.topic)
          .flatMap(_.partitionsMapStream)
          .filter(_.nonEmpty)
          .flatMap { partitionStream =>
            fs2.Stream
              .emits(partitionStream.toVector.map { case (streamPartition, partitionedStream) =>
                val batchInterval = FiniteDuration.apply(topicConfig.batchInterval, TimeUnit.SECONDS)
                fs2.Stream.eval(
                  AppLogger.log(
                    s"New partition assigned to consumer for Topic: ${streamPartition.topic()}:${streamPartition.partition()}",
                    TraceContext(
                      partition = streamPartition.partition(),
                      work = "Partition Rebalance - New Partition Assigned",
                      status = TraceTags.statusSuccess,
                      source = streamPartition.topic()
                    )
                  )
                ) ++
                  partitionedStream
                    .groupWithin(streamConfig.maxConcurrent, batchInterval)
                    .evalMap { records =>
                      val (messages, offsets: Chunk[CommittableOffset[F]]) = records.foldMap { ev =>
                        ev.record.pure[Chunk] -> ev.offset.pure[Chunk]
                      }
                      val commit: F[Unit] = CommittableOffsetBatch.fromFoldable(offsets).commit
                      val batchContext =
                        BatchContext(streamPartition.topic(), streamPartition.partition(), offsets.head.map(_.offsets.head._2.offset()).getOrElse(-1))

                      processStreams(messages, batchContext) >> commit
                    }
              })
              .parJoinUnbounded
          }

It looks like your code expects all partition streams in the map to terminate on a rebalance and be replaced with new streams in the new partitions map, whereas what actually happens with the cooperative sticky assignor is that only streams for revoked partitions are closed, and the new map contains only streams for newly assigned partitions. Therefore you need to handle partition maps from the outer stream in parallel rather than sequentially. I think the fix is to move the .parJoinUnbounded to the very end of your snippet so it's joining the outer stream rather than the inner one - could you try that and let me know if it fixes the problem?

I confirm that moving the .parJoinUnbounded to the outer stream resolved the problem

Reopening as we should document this properly.