fd4s/fs2-kafka

Question around custom Partition assignments

bcarter97 opened this issue · 3 comments

Hello, I am trying to create an assignment manually and I've noticed a difference between the FS2 KafkaConsumer and underlying KafkaConsumer with the overload for

def assign(partitions: NonEmptySet[TopicPartition]): F[Unit]

I'm not sure why it requires a NonEmptySet of partitions, as the underlying consumer does an empty check and calls unsubscribe if the partition set is empty:

https://github.com/apache/kafka/blob/4653507926a42dccda5c086fcae6278afcfc53ca/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1115

if (partitions == null) {
                throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
            } else if (partitions.isEmpty()) {
                this.unsubscribe();
            }
...

Do you think it is required that the Set should be NonEmpty? If the TopicPartitions are incorrect a java.lang.IllegalStateException: No current assignment will be thrown by the KafkaConsumer so validating the TopicPartitions Set seems redundant