fd4s/fs2-kafka

Consider using type-specific native Kafka serdes

bplommer opened this issue · 3 comments

Currently the only native Kafka serializers and deserializers we use are the ones for Array[Byte] - we then use our own serializer and deserializer types to convert between Array[Byte] and user types. I think we should continue to use our own more strongly-typed ValueDeserializer etc types, but have them either extend the native Kafka types or have a method to produce instances of the Kafka types to use when instantiating the Kafka Consumer and Producer.

Performance

This should both increase speed and reduce memory usage / garbage creation. At present, when deserializing Avro (for example), each message is first read from the TCP socket into an Array[Byte] allocated by the native ByteArray deserializer; the fs2-kafka Avro deserializer then reads from the allocated Array[Byte]. If instead we used a native Kafka Avro deserializer directly, it could instead read directly from the socket (Avro deserializers natively operate on InputStream), avoiding allocation of an intermediate array. This would particularly save a lot of copying in the (not uncommon) case where an Avro record has many fields but we're only interested in a couple (the ones we're not interested in can be skipped entirely when reading from the socket), and when we're not interested in keys (by deserializing them as null we avoid copying anything).

I'd want to have some benchmarks to validate any performance improvements of course.

Design simplicity

It's conceptually confusing having our own Serializer and Deserializer classes that commonly wrap native Kafka ones but are completely separate from them type-wise, and then work in combination with other native serdes (the Array[Byte] ones). I guess the motivation is for side effects to be suspended in a polymorphic F - I have a couple of thoughts on this:

  1. Purity here has relatively small benefits, and side-effecting serdes are probably fine. The side effects we expect relate to the mechanics of serializing/deserializing according to a schema, and should never have any effect on application logic (for them to do so would almost certanly be very, very incorrect.) We can still provide typeclass instances for pure functional manipulation of the serdes - only the serialization/deserializtion operation itself would be impure.
  2. If people really need serdes to interop with pure code we can add fromSyncIO and fromAsync methods that use SyncIO#unsafeRunSync and Dispatcher.

Other implications

This would mean largely reverting #902 - KafkaConsumerActor would once again be typed according to the message type (but all deserialization would take place inside the native consumer). KafkaProducerConnection would no longer make sense, but we could add contramap to KafkaProducer.

Any thoughts on this?

@vlovgr @LMnet what do you think? #902 hasn't been released in a non-milestone yet, so if we're going to go this way I'd like to revert it before releasing 2.5.0 so we don't change partitionedStream behaviour twice.

I think we should continue to use our own more strongly-typed ValueDeserializer etc types, but have them either extend the native Kafka types or have a method to produce instances of the Kafka types to use when instantiating the Kafka Consumer and Producer.

Either way sounds good to me. I know some users rely on parallelised decoding, which you should still be able to do.

I know some users rely on parallelised decoding, which you should still be able to do.

Yeah, you can always just deserialise your bytes and then decode them in user code.