fd4s/fs2-kafka

Does KafkaConsumer.stopConsuming actually allow to process all in-flight messages?

ex-ratt opened this issue · 7 comments

The ScalaDoc of KafkaConsumer.stopConsuming says that "All currently running streams will continue to run until all in-flight messages will be processed" and "streams will be completed when all fetched messages will be processed". According to it, the question of the title is answered with "yes". However, I stumbled upon a situation that sometimes does not work how I would expect it.

My goal is to have a finite stream that reads all currently existing messages from a Kafka topic (that is not produced to in that moment). I can't rely on the offset of the messages itself, since there may be some number of commit and abort markers at the end of the partition that are unavailable to me - so the last actual (committed) message may have an offset that is a bit behind the end offset of the topic. I thought that requesting the actual position and stopping consumption would solve the problem, something like this:

Stream.eval(consumer.assignment.flatMap(consumer.endOffsets)).flatMap { endOffsets =>
  println(s"end: $endOffsets") // TODO temporary
  consumer.stream
    .chunks
    .evalTap { chunk =>
      val offsets = chunk.last.map(_.offset.offsets.view.mapValues(_.offset()).toMap).getOrElse(Map.empty) // TODO temporary
      assignmentPositions
        .flatTap(positions => println(s"at:  $offsets  (positions: $positions)").pure) // TODO temporary
        .map(_.map2(endOffsets)(_ >= _).forall(_._2))
        .ifM(consumer.stopConsuming.flatTap(_ => println("stopped").pure), Applicative[F].unit) // TODO println is temporary
    }
    .unchunks
}

This version would not work with an empty topic, but it allows me to reproduce the issue more reliably compared to a concurrent stream that checks the assignment positions. Please excuse the temporary printlns that obscure the code a bit. Sometimes, the output looks like this (as expected):

end: Map(topic-0 -> 718)
at:  Map(topic-0 -> 83)  (positions: Map(topic-0 -> 84))
at:  Map(topic-0 -> 608)  (positions: Map(topic-0 -> 718))
stopped
at:  Map(topic-0 -> 717)  (positions: Map(topic-0 -> 718))
stopped

But at other times, the last two lines of the output (and the last chunk of the stream) is missing. Maybe Kafka reports updated positions before it sends the records over? Or maybe the KafkaConsumerActor actually received the messages, but the stream is stopped prematurely (possibly due to a race condition)? I can't really see anything wrong in KafkaConsumer, but then again the code is more complex than I can handle, so maybe I'm overlooking something.

Are my assumptions about how it should work wrong? Or could there be an issue with how stopConsuming works? Maybe there is a more reliable way to have a finite stream of all available messages?

It's not really clear to me what you want to accomplish, but looks like your issue is not specific to FS2 Kafka, but how Kafka works. Let me explain 👇🏽

My goal is to have a finite stream that reads all currently existing messages from a Kafka topic (that is not produced to in that moment)

According to this, the topic is not created, or the messages are not produced (it's not clear to me). Either way, without messages, you can't play with offsets.

In any case, I think you don't need to use stopConsuming. Instead, I'm assuming you have some marker message in the topic indicating that is the last message to consume. Because you end up with a raw FS2 Stream, you can use takeThrough or takeWhile and then you'll have your finite stream gracefully stopped after processing all the messages.

Hope this helps to clarify how to proceed and sorry for the late response 😅

Hi Alan,

thank you for your reply.

The topic does exist and it may have messages. I just want all the messages of that topic from beginning to end. The last message in that topic does not have any indication on it that implies that it is the last message. This is why I created this complicated mess to figure out whether the consumer has read anything (by querying the current positions) and stop the consumption.

Some background: The use case is a topic that stores the state of a computation. Whenever the result of the computation is produced to Kafka, so is the state (within the same transaction). When the application is restarted, then it needs to get the last state in order to continue where it left off. So it wants the last message of the (compacted) state topic. But state may consist of several states (with a different key each) that need to be combined, so all those messages are read. During the restoration of the state, no new state is written to the topic, but after restoring the state and continuing the computation, new state messages are written.

Because I don't know when the application is about to shut down (this may be caused by an exception), I can't reliably write a marker message. But maybe there is a simpler solution to this problem, other than relying on stopConsuming?

PS: To "solve" the problem, I added a delay before calling stopConsuming, such that in-flight messages have some time to be processed.

Ahhh! Huh... 🫠 Tricky (but interesting) scenario.

So, the number of different states (different keys) is not known beforehand, right?

Right - there are cases where we don't know. In most cases however, we know these keys, but they may differ from the actual messages in the topic, because the configuration can change in between restarts of the application.

And last question, when your consumer KafkaConsumer is created for restoring the state of the computations, it's the only instance interacting with the topic (no more messages from other instances are being stored)?

Because if you can guarantee your instance is the only one doing stuff with that topic, and assuming you are in full control of the topic (your application is the only one writing/reading messages), you can do the following:

  1. At the beginning of the application, produce a message with a null value and a key with something unique that can't clash with other keys (like restore-marker or the like).
  2. After producing the message, you can get the offset of that message.
  3. There you have your end message that will not be erased by log compaction (it has a unique key, and no other instances will write a newest message for that key).
  4. On the consumer side is just a matter of using the option value deserializer and consumer.records.takeWhile(offset =!= restore-marker-offset).unNone.[...]

But, I'm assuming a couple of things that can't be true 😅

In any case, this solution is 100% deterministic and will work well. At least is better that adding a delay that can fail for multiple reasons alien to you (network latencies, instance CPU resources if you're in Kubernetes...).

This is an excellent idea. I hope I can try it in the coming weeks. Thanks for the help :)