Implicit Serdes
maver1ck opened this issue · 22 comments
Please add option to use implicit Serdes in this library,
Similar to:
https://github.com/ogirardot/typesafe-kafka-streams
We are having thoughts on this as we speak. There was already a PR (#40) with implicit serdes.
We are in the process of thinking through the use cases as the handling of Serdes from the top level APIs has changed a lot in Kafka Streams 1.0. There is now an abstraction Serialized which is used for handling serialization of key/values. And the APIs have 2 types of signatures, one that takes a Serialized and uses them. And the other that uses the default serializers. We are thinking of how to unify them with implicits without losing the ability to use the default serializers.
By default serializers you mean those from default.key.serde and default.value.serde options ?
The question is if we need them at all? Probably not.
If u look at the APIs KStream#groupByKey() and KStream#groupByKey(Serializable), the first one uses the default serializers while the second one uses the ones explicitly supplied. If we have implicit serializers and force the users to use them, then we lose out the first capability. Every time the user has to supply the serializers as implicit values. This is the topic we debated in the PR which I referenced earlier.
One option that we are thinking is to check the availability of the implicit values of the serializers and act accordingly .. if both serdes are available for key and value we use the second API, else use the first one.
WDYT ?
In the last question I just wanted to check if we're on the same page.
I'm thinking if using default serializers isn't equal to have them as implicit values ?
(even better because if serializer type is wrong we'll get an error during compilation not in the runtime)
So current setting:
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
we'll change to:
implicit val stringSerde = Serdes.String()
Indeed with implicits we have more type safety. The point is does this take away some feature from the underlying library ? We debated this as well in the PR.
So you think it's ok to take away this feature of setting the defaults as part of config and handle with implicits ?
But having function with implicit gives possibility to set explicit values on the runtime.
The only difference will be that we'll be using Serdes instead of Produced.with etc.
Yes, we can set explicit values with Serdes. We can replace all APIs that take Serialized with implicit Serdes. But possibly we may have to keep the ones that take Produced or Consumed (in addition to the new ones that take implicit Serdes) as these 2 allow setting of other values besides the serdes.
Sounds good.
So for groupBy we will have two definitions:
def groupBy(function)(implicit keySerde: K, valueSerde: V)
def groupBy(function, Serialized.with(keySerde, valueSerde))
We can debate if we need the second variant for Serialized if we have the first. But I think we need both the variants for functions that take Produced or Consumed as you can set other values besides the serdes with them.
Maybe providing an opt-in that enables the default serialized behavior is a solution?
// This could also be Serde instead of Serialized
// The principle is the same but the implementation slightly more difficult
implicit case object ConfigSerialized extends Serialized[Nothing, Nothing] {
// throw UnsupportedOperationException in methods
}def through(topic: String)(implicit serialized: Serialized[K,V]): KStreamS[K, V] =
if(serialized eq ConfigSerialized)
inner.through(topic)
else
inner.through(topic, serialized)This way the default is the type-safe Scala-style implicit API, but if the users adds import streams.ConfigSerialized the default serializers can be used.
If you want the behavior as default you could make the signature
def through(topic: String)(implicit serialized: Serialized[K,V] = ConfigSerialized)this won't need the import, but you don't get a compile-type error with a missing implicit if you forget to define it, which is not what Scala users expect.
@dhoepelman - this was the idea I was toying with to provide both the options .. see my remark below in one of the earlier comments ..
One option that we are thinking is to check the availability of the implicit values of the serializers and act accordingly .. if both serdes are available for key and value we use the second API, else use the first one.
And you are correct that providing implicits on Serialized or Produced or Consumed calls for an easier implementation.
To everybody on this comment thread, how about the following strategy ..
- we provide all implicits on
SerializedorProducedorConsumedinstead ofSerde, because these are the preferred abstractions in Kafka Streams1.0.0 - use the strategy mentioned by @dhoepelman to handle default serializers. The default strategy will be using the implicits (recommended) but if someone wants to use the config serializers, she can do that with a little more effort of an additional import. This also follows the principle of making the recommended way easier to use.
Ad 1)
But then we need to provide O(n^2) implicit values instead of O(n), where n = number of types.
@dhoepelman This one may be tricky ..
implicit case object ConfigSerialized extends Serialized[Nothing, Nothing] {
// throw UnsupportedOperationException in methods
}Serialized's constructor takes another Serialized and also the implicit match may not work because of (in)variance of Java type.
I have started working on an implementation ..
@debasishg right. Invariant is correct too since a Serde (and thus Serialized) is both a serializer and deserializer :/
Instead of a case object an implicit producer should work
implicit def configSerialized[K,V]: Serialized[K,V] = new ConfigSerialized[K,V]()
// Named class so we can check isInstanceOf[ConfigSerialized]
private[streams] ConfigSerialized[K,V]() extends Serialized[K,V](Serialized.`with`(null, null)) {}
But I don't know if this will cause weird things with implicit lookups or other problems...
I and my colleague @svroonland have some time to look into or help with an implementation , but only February 16th
How about the technique described in http://missingfaktor.blogspot.in/2013/12/optional-implicit-trick-in-scala.html ? It's a dressed up variant of the null check on the implicit. The advantage is that we can use this technique generically and NOT have to have specialized implementations for each of Serialized, Produced and Consumed ..
I have difficulty seeing the advantage of Perhaps over providing a null default value for the implicit, but using a default null/Perhaps for the implicit parameter is definitely a solution to support both behaviors.
Is it possible to provide an opt-in mechanism in this case?
Doing this by default is a bit dangerous because it transforms compile-time errors into run-time errors or might silently change run-time behavior on changes.
I agree regarding the transformation of compile time errors into run time errors. And as I already mentioned, it's a dress up over the null check pattern of implicits.
But it is possible to provide an opt-in mechanism with this technique as well .. Just an example I was trying out ..
The API implementation ..
def groupByKey(implicit serialized: Perhaps[Serialized[K, V]]): KGroupedStreamS[K, V] =
serialized.fold[KGroupedStreamS[K, V]] { inner.groupByKey } { implicit ev => inner.groupByKey(ev) }an implicit to convert the serdes into a Serialized ..
implicit def SerializedFromSerde[K,V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K,V] =
Serialized.`with`(keySerde, valueSerde)and we can have the API usage with ..
implicit val stringSerde: Serde[String] = Serdes.String()
implicit val longSerde: Serde[Long] = Serdes.Long().asInstanceOf[Serde[Long]]or ..
implicit val serialized = Serialized.`with`(stringSerde, longSerde)or if we don't have the implicits in scope, the config values will be used, since in the above implementation inner.groupByKey will be called.
I was also playing around with your suggestion ..
Here's the implicit for Serialized ..
implicit def configSerialized[K,V]: Serialized[K,V] = new ConfigSerialized[K,V]()
private[streams] class ConfigSerialized[K,V]() extends Serialized[K,V](Serialized.`with`(null, null)) {}Here's the API implementation ..
def groupByKey(implicit serialized: Serialized[K, V]): KGroupedStreamS[K, V] =
if (serialized.isInstanceOf[ConfigSerialized[K, V]]) inner.groupByKey
else inner.groupByKey(serialized)yeah .. that isInstanceOf (sort of) subverts the type system ..
Now the question is how do I include the implicit from serde to Serialized .. The moment I include the following in the ImplicitConversions object ..
implicit def SerializedFromSerde[K,V](implicit keySerde: Serde[K], valueSerde: Serde[V]): Serialized[K,V] =
Serialized.`with`(keySerde, valueSerde)I get ambiguous implicits in lookup ..
Implicit Serdes now in master - closing this issue.