TypedEncoder not working with UDF (or not properly documented) for class having a field typed with a ValueClass
cchantep opened this issue · 13 comments
Hi,
For now, using frameless 0.10.1, we cannot use value class as a field with TypedEncoder
and Injection
:
import frameless._
import frameless.syntax._
import org.apache.spark.sql.SparkSession
implicit val sparkSession =
SparkSession.builder.master("local[2]").getOrCreate()
val ds = TypedDataset.create(Seq("1"))
final class Lorem(val value: String) extends AnyVal
case class Foo(lorem: Lorem)
implicit val LoremInjection: Injection[Lorem, String] =
Injection((_: Lorem).value, new Lorem(_: String))
implicit val LoremEncoder: TypedEncoder[Lorem] =
TypedEncoder.usingInjection[Lorem, String]
val toFoo = ds.makeUDF { id: String =>
Foo(new Lorem(id))
}
ds.select(toFoo(ds.asCol)).collect().run
This compiles, but throw the following Exception
at runtime:
java.lang.ClassCastException: java.base/java.lang.String cannot be cast to Lorem
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$FramelessUdfEvalImpl.Invoke_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$FramelessUdfEvalImpl.FramelessUdf_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$FramelessUdfEvalImpl.apply(Unknown Source)
at frameless.functions.FramelessUdf.eval(Udf.scala:130)
at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:156)
In some case, returning the tupled value for the case class instance + adding .andThen
as bellow to the UDF function, weirdly make it works:
ds.makeUDF { .. }.andThen { data =>
data.typed[RowType, CaseClassWithValueClassField](
data.untyped.cast(TypedEncoder[CaseClassWithValueClassField].catalystRepr)
)
}
Using UDF with injection like that can be tricky. You discovered a case where unfortunately things break at runtime, which is definitely not ideal. I assume you have some data that already encoded in B and you want to initialize them as a TypedDataset[A] using an Injection[A,B]. Is my assumption correct?
If that's the case, then simply read your data as a Dataframe with underlying type B (that's going to be String in your example). Then use TypedDataset.createUnsafe[A](df)
to create your desired TypedDataset[A].
Example:
val df = TypedDataset.create(Seq("1")).dataset.toDF // I assume you read your data from a file or something and load them as a dataframe with the right types.
val x = TypedDataset.createUnsafe[Lorem](df)
x.collect().run()
Hi @imarios , thanks for your feedback. Goal is to be able to use UDF to convert data to instance of case classes where at some point fields are defined as ValueClass. Issue is mixing case class and value class, such as in the previous example.
The suggestion I have above does what you need without having to use the UDF to convert them. With regards to your comment on ValueClass, are you saying that if Lorem was a case class, then everything would be fine?
I will check, but I doubt much that we can do without UDF.
Let me know if case classes solves this (I feel we are going to see the same issue).
@imarios yes if valueclass is refactored as case class it works, but that's not the wanted modeling.
And we cannot do without UDF.
Even a TypedDataset.create(seqOfSameModel).toDF()
is failing with the same issue (that is to say without UDF)
Still not working; Here is a full reproducer: https://github.com/cchantep/frameless-sandbox/blob/master/src/test/scala/EncoderSpec.scala#L33
Seem that scalar and struct cases need to be handled differently: https://github.com/cchantep/frameless-sandbox/blob/master/src/test/scala/EncoderSpec.scala#L75
Also case of Option
needs a specific handling: https://github.com/cchantep/frameless-sandbox/blob/master/src/test/scala/EncoderSpec.scala#L78