fd4s/fs2-kafka

New stream method with the full rebalance information

LMnet opened this issue · 3 comments

LMnet commented

After an #844 partitionsMapStream is no more a "Stream where each element contains a current assignment" as was mentioned in the scala doc previously.

It looks like if we want to retain such functionality, we need to add a new stream method like this:

def detailedDataStream: Stream[F, FullAssigmnentInfo[F, K, V]]

case class FullAssigmnentInfo[F[_], K, V](
  newlyAssignedPartitions: Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]],
  revokedPartitions: SortedSet[TopicPartition],
  retainedPartitions: SortedSet[TopicPartition],
)

Naming is not final, just my thoughts.

I think this is a great idea. We could initially make it a private method and implement the other methods in terms of it, so we have time to iron out any issues before exposing it in the public API. Then when we expose it we can provide default implementations for the other methods in terms of it on KafkaConsume, making it much easier to mock KafkaConsume for testing.

Having played with this a bit, I think the stream should instead emit an ADT with subtypes Assigned(newlyAssignedPartitions: Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]]) and Revoked(revokedPartitions: SortedSet[TopicPartition]). That's what the information we get from Java Kafka looks like, and a stream of FullAssignmentInfo can be generated in user code via a suitable fold.

LMnet commented

But what about retained partitions? We could provide this information too.