fd4s/fs2-kafka

Possibility of blocking/greedy rebalance handlers

Closed this issue · 9 comments

#117 was a nice improvement over previous behavior in the sense that commit commands aren't lost anymore, but there is another issue that can't be controlled in any way right now: for consumers that want exactly-once [ish] with batching (e.g. groupWIthin) it's possible to consume the same record more than once due to rebalances.
Consider a 2-consumer group with members A and B, where only A is active right now. Say there is a record R, which gets consumed by A and at the beginning of a 5-second window, and then within this 5-second window a rebalance happens. Now, if R belonged to a partition that was reassigned to B, and if A didn't commit R yet, A's commit will be deferred until after rebalance completes, which means A will commit R and B will consume R and then commit it too, which means double consumption w/o possibility to control this
One solution I can see is having an option to wait for the completion of processing of all currently consumed records in revocation handler, i.e. don't revoke until we completely handled everything we've read up to "now" and then proceed normally.
WDYT?

for example, this would be a failing test:

package fs2.kafka

import scala.concurrent.duration._
import cats.effect.IO
import cats.implicits._
import fs2.{Chunk, Pipe, Stream}

final class RebalanceSpec extends BaseKafkaSpec {
  type Consumer = KafkaConsumer[IO, String, String]
  type ConsumerStream = Stream[IO, CommittableMessage[IO, String, String]]

  describe("rebalance process") {
    it("should consume only once") {
      withKafka { (config, topic) =>
        createCustomTopic(topic, partitions = 6)
        Stream
          .unfoldEval[IO, Int, Int](0)(i => IO.pure(Some(i -> (i + 1))))
          .take(1024 * 16)
          .chunkN(32)
          .zipLeft(Stream.repeatEval(timer.sleep(50.millis)))
          .map(xs => publishToKafka(topic, xs.map(i => s"$i" -> s"$i").toList))
          .compile
          .drain
          .map(x => println(s"producer completed $x"))
          .unsafeRunAsyncAndForget()

        val commit: Pipe[IO, Chunk[CommittableOffset[IO]], Unit] =
          commitBatchChunk[IO]

        val cstream: Stream[IO, Int] = consumerStream[IO]
          .using(consumerSettings(config).withGroupId("g1"))
          .evalTap(_.subscribeTo(topic))
          .evalTap(consumer => IO(consumer.toString should startWith("KafkaConsumer$")).void)
          .flatMap(_.stream)
          .groupWithin(1000, 5.second)
          .flatMap(
            c =>
              commit(Stream(c.map(_.committableOffset)))
                .map(_ => println(s"committed chunk of ${c.size}"))
                .flatMap(_ => Stream.chunk(c))
          )
          .map(_.record.value.toInt)
          .interruptAfter(120.seconds)

        val s1: Stream[IO, Int] = cstream
        // will trigger rebalance
        val s2: Stream[IO, Int] = cstream.delayBy(7.seconds)

        val (c1, c2) = (s1.compile.toVector, s2.compile.toVector).parTupled.unsafeRunSync()

        assert(c1.intersect(c2).isEmpty)
      }
    }
  }
}

if there was a way to do blocking commits on rebalance, it would pass.

Thanks for opening this issue @tkroman!

We can definitely add blocking rebalance listeners and support for syncronous commits, but the tricky part is knowing when you've completed 'processing of all currently consumed records' in the user-defined Stream. Do you have an idea of how you would like that to look?

I've spent whole day yesterday trying to come up with a way to solve this in a way that would provide an out-of-the-box (from fs2-kafka POV) solution and I think under current constraints there isn't one. I think providing a full-fledged support for rebalance listeners is the right choice here. Here's what I've found out yesterday:

  • users would want to have arbitrary (maybe low-level, i.e. native kafka consumer) access within rebalance listeners (for example, in this case - I definitely want to do commits in the out-of-order fashion, i.e. not through the fs2-kafka request queue). Plus in any case committing during rebalance is impossible right now due to the pending commits workaround for commit exceptions from #117.
  • since rebalance callback is only called in the same thread and within poll calls, it's impossible to use withConsumer since it's not "reentrant" in this sense
  • user-defined rebalance listeners should not interfere with (or be affected by) the current implementation of revoked
  • if there is a way to provide arbitrary rebalance listeners, fs2-kafka won't have to bother with shipping ad-hoc fixes/workarounds.

To answer your question - I have an implementation that is guarding against this issue, but it's based on a "native" kafka-consumer w/o any abstractions on top, so the logic there is something like this (pseudocode):

// `inflight` is a shared state
// it is a set of consumer records
// messages can be added to `inflight` when they are polled
// and removed either manually or after commit happened
inflight = Set.empty

newRecords = poll()
inflight ++= newRecords

// `process` is business logic.
//  it returns a set of committable messages
// it also can access `inflight` to remove certain "discarded" messages
commits = process(newRecords)
commit(commits)
inflight --= commits

and rebalance listener is defined as

def onRevoked(tps) = {
  // block until there are inflight messages
  while (!inflight.isEmpty) busyLoop()
}

I can roughly see how this would translate to a streaming analogy, but I'm relatively new to fs2 to confidently start implementing production-grade solution, so any help or further discussion would be greatly appreciated.

It sounds like we want a rebalance listener which provides access to the Java Kafka consumer (although wrapped so that operations are suspended in F[_]), such that operations execute synchronously in the callback.

Yeah, that + optionally standardizing on some sort of inner state? I still can't formulate it precisely but from my example above - the process part is the one that defines if the business logic was executed on the message or not. So if during rebalance we "stop" the flow in such a way that we know the set of records that reached the process part and we definitely don't let any other records flow to process during rebalance, then we can provide exactly-once guarantees.
This is very hand-wavy, I absolutely understand that, but unfortunately I don't have better words to describe this.

With the changes in #129 we should be able to just use consumer.position(partitions) to know what's been emitted in the stream. You then just need to know the latest offset which has been processed, which can be done with a Ref[F, Map[TopicPartition, Long]].

Edit: you might also be interested in #128 for true exactly-once delivery, rather than just minimizing the risk of duplicates.

FWIW, the same issue that I raised in akka's kafka library repository (akka/alpakka-kafka#539 (comment)) was fixed in akka/alpakka-kafka#949 (by way of wrapping more kafka classes and exposing them in the API).

@vlovgr @tkroman i am interested in this ticket. Our team is facing the same issue, we need to use Alpakka -> reactive stream -> fs2 stream as a workaround. Could you give me some hints where I need to start?

@vlovgr We are observing the same problem with transaction actually. Could you share what you think would prevent this, if we were using the transactional loop ?

@tkroman Have you got since then, manage to have some adhoc work around ?