fd4s/fs2-kafka

Producing with empty key leads to producing in the same partition

sergio-margale opened this issue · 2 comments

I found an issue within the latest tags (3.0.0-MX). I have a producer producing with an empty string as key, and I expected that as with Kafka, it will produce with the RoundRobinProducer but it turned out that it's always producing into the same partition.

producer
      .produceOne(topic = config.topic, key = "", value = data)
      .flatten
      .void

I tried with null but then I got

Caused by: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "s" is null
fs2.kafka.GenericSerializer$.string$$anonfun$1(Serializer.scala:191)
fs2.kafka.GenericSerializer$.lift$$anonfun$1(Serializer.scala:167)
fs2.kafka.GenericSerializer$$anon$1.serialize(Serializer.scala:132)
fs2.kafka.KafkaProducer$.serializeToBytes(KafkaProducer.scala:220)
fs2.kafka.KafkaProducer$.asJavaRecord(KafkaProducer.scala:233)
fs2.kafka.KafkaProducer$.produceRecord$$anonfun$1(KafkaProducer.scala:180)
fs2.Chunk.loop$1(Chunk.scala:389)
fs2.Chunk.traverse(Chunk.scala:416)
fs2.kafka.KafkaProducer$.produce$$anonfun$1(KafkaProducer.scala:167)
fs2.kafka.internal.WithProducer$$anon$1.apply(WithProducer.scala:47)
fs2.kafka.KafkaProducer$.produce(KafkaProducer.scala:168)
fs2.kafka.KafkaProducerConnection$$anon$1.produce(KafkaProducerConnection.scala:128)
fs2.kafka.KafkaProducer$$anon$1.produce(KafkaProducer.scala:134)
fs2.kafka.KafkaProducer$ProducerOps$.produceOne$extension(KafkaProducer.scala:91)
fs2.kafka.KafkaProducer$ProducerOps$.produceOne$extension(KafkaProducer.scala:85)

....

I managed to make it work setting as key a random UUID but shouldn't this work out of the box?

LMnet commented

fs2-kafka doesn't introduce any new behavior in terms of producer partitioning. It relies fully on the underlying java behavior.

I have a producer producing with an empty string as key, and I expected that as with Kafka, it will produce with the RoundRobinProducer but it turned out that it's always producing into the same partition.

Unfortunately it's not how it works. By default kafka uses DefaultPartitioner. And it's behave differently. Check java docs from my link to get more information.

I tried with null but then I got

Your errors looks a bit weird. To send null to kafka you should create KafkaProducer[F, Option[K], V] where K — is your key type. With this producer you will be able to send null as None (see scala.Option).

If you want to use RoundRobinPartitioner you can set partitioner.class config value to org.apache.kafka.clients.producer.RoundRobinPartitioner.

Thanks @LMnet, I'm not sure where I read regarding RoundRobinProducer to be the default one 🤔

I will close the issue then