New stream method with the full rebalance information
LMnet opened this issue · 3 comments
After an #844 partitionsMapStream
is no more a "Stream
where each element contains a current assignment" as was mentioned in the scala doc previously.
It looks like if we want to retain such functionality, we need to add a new stream method like this:
def detailedDataStream: Stream[F, FullAssigmnentInfo[F, K, V]]
case class FullAssigmnentInfo[F[_], K, V](
newlyAssignedPartitions: Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]],
revokedPartitions: SortedSet[TopicPartition],
retainedPartitions: SortedSet[TopicPartition],
)
Naming is not final, just my thoughts.
I think this is a great idea. We could initially make it a private method and implement the other methods in terms of it, so we have time to iron out any issues before exposing it in the public API. Then when we expose it we can provide default implementations for the other methods in terms of it on KafkaConsume
, making it much easier to mock KafkaConsume
for testing.
Having played with this a bit, I think the stream should instead emit an ADT with subtypes Assigned(newlyAssignedPartitions: Map[TopicPartition, Stream[F, CommittableConsumerRecord[F, K, V]])
and Revoked(revokedPartitions: SortedSet[TopicPartition])
. That's what the information we get from Java Kafka looks like, and a stream of FullAssignmentInfo
can be generated in user code via a suitable fold.
But what about retained partitions? We could provide this information too.