Producing with empty key leads to producing in the same partition
sergio-margale opened this issue · 2 comments
I found an issue within the latest tags (3.0.0-MX). I have a producer producing with an empty string as key
, and I expected that as with Kafka, it will produce with the RoundRobinProducer but it turned out that it's always producing into the same partition.
producer
.produceOne(topic = config.topic, key = "", value = data)
.flatten
.void
I tried with null
but then I got
Caused by: java.lang.NullPointerException: Cannot invoke "String.getBytes(java.nio.charset.Charset)" because "s" is null
fs2.kafka.GenericSerializer$.string$$anonfun$1(Serializer.scala:191)
fs2.kafka.GenericSerializer$.lift$$anonfun$1(Serializer.scala:167)
fs2.kafka.GenericSerializer$$anon$1.serialize(Serializer.scala:132)
fs2.kafka.KafkaProducer$.serializeToBytes(KafkaProducer.scala:220)
fs2.kafka.KafkaProducer$.asJavaRecord(KafkaProducer.scala:233)
fs2.kafka.KafkaProducer$.produceRecord$$anonfun$1(KafkaProducer.scala:180)
fs2.Chunk.loop$1(Chunk.scala:389)
fs2.Chunk.traverse(Chunk.scala:416)
fs2.kafka.KafkaProducer$.produce$$anonfun$1(KafkaProducer.scala:167)
fs2.kafka.internal.WithProducer$$anon$1.apply(WithProducer.scala:47)
fs2.kafka.KafkaProducer$.produce(KafkaProducer.scala:168)
fs2.kafka.KafkaProducerConnection$$anon$1.produce(KafkaProducerConnection.scala:128)
fs2.kafka.KafkaProducer$$anon$1.produce(KafkaProducer.scala:134)
fs2.kafka.KafkaProducer$ProducerOps$.produceOne$extension(KafkaProducer.scala:91)
fs2.kafka.KafkaProducer$ProducerOps$.produceOne$extension(KafkaProducer.scala:85)
....
I managed to make it work setting as key a random UUID but shouldn't this work out of the box?
fs2-kafka doesn't introduce any new behavior in terms of producer partitioning. It relies fully on the underlying java behavior.
I have a producer producing with an empty string as key, and I expected that as with Kafka, it will produce with the RoundRobinProducer but it turned out that it's always producing into the same partition.
Unfortunately it's not how it works. By default kafka uses DefaultPartitioner
. And it's behave differently. Check java docs from my link to get more information.
I tried with null but then I got
Your errors looks a bit weird. To send null
to kafka you should create KafkaProducer[F, Option[K], V]
where K — is your key type. With this producer you will be able to send null
as None
(see scala.Option
).
If you want to use RoundRobinPartitioner
you can set partitioner.class
config value to org.apache.kafka.clients.producer.RoundRobinPartitioner
.
Thanks @LMnet, I'm not sure where I read regarding RoundRobinProducer
to be the default one 🤔
I will close the issue then