How should parse and convert data from an external medium in a generic way?
frgomes opened this issue · 2 comments
I would like to create a generic TypedDataset[T]
where T
is generic.
For more context on the topic, I would like to point to a similar question on StackOverflow which gave me some ground for experimentation:
https://stackoverflow.com/questions/55637342/implicit-encoder-for-typeddataset-and-type-bounds-in-scala
After a lot of experimentation, I've finally got to a case class which describes which defines a read
method to be executed later:
import scala.reflect.ClassTag
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
import frameless.TypedDataset
import frameless.TypedEncoder
case class Descriptor[T <: Product : TypedEncoder : ClassTag](label: String, source: String, options: Map[String, String], format: String) {
def read(implicit spark: SparkSession): Try[TypedDataset[T]] = Try {
import org.apache.spark.sql.Encoders
val df: DataFrame = raw.read(source, options, format)(spark)
val ds: Dataset[T] = df.as[T](Encoders.kryo[T])
TypedDataset.create(ds)
}
}
object Descriptor {
def apply[T <: Product : TypedEncoder : ClassTag]
(label: String, source: String, options: Map[String, String], format: String,
pf: PathFormatter, temporal: java.time.temporal.Temporal, table: Option[String]): Descriptor[T] = {
val args: Map[String, Any] = Map("source" -> source, "entity" -> table.getOrElse(label), "temporal" -> temporal)
pf.format(args) match {
case Success(source) => Descriptor[T](label, source, options, format)
case Failure(t) => throw t
}
}
}
Then, I've defined a test case which reads a CSV file without schema inference, then converts fields to proper data types, parsing the data, converts the data as appropriate and finally returns a TypedDataset[T]
like this:
import org.apache.spark.sql.SparkSession
val options = Map("delimiter" -> ",", "header" -> "true", "inferSchema" -> "false") // MUST NOT INFER SCHEMA!
def reader(implicit spark: SparkSession): Try[TypedDataset[Person]] = {
import TypedEncoder.usingDerivation
val encoderPerson: TypedEncoder[Person] = TypedEncoder[Person]
val descPerson = Descriptor[Person]("person", "/path/to/person/data.csv", options, "csv")
val tdsPerson: Try[TypedDataset[Person]] = descPerson.read(spark)
tdsPerson
}
This is how I've defined the schema for Person
:
case class Person(age: Int, name: String)
object Person {
import frameless.Injection
val tupled = (Person.apply _).tupled
implicit val injection: Injection[Person, (Int, String)] =
Injection(p => unapply(p).get, tupled)
}
Question1: Regarding encoders and decoders
A key step in all this logic above is done by this line of code below, part of read
method:
val ds: Dataset[T] = df.as[T](Encoders.kryo[T])
Employing Kryo in Spark, configuring it and registering encoders/decoders are out of topic here. The key point is:
Does it make sense to use Kryo? Shouldn't I employ encoders/decoders from frameless
instead?
Am I missing something?
Question 2: Limitations
The case class Person
above works fine for a HelloWorld application. However, in real life, a schema may have several dozens of fields, which hits the limitation of 22 fields in Scala 2.12. Any ideas apart from employing Scala 2.13?
Thanks
Answering my own questions:
Question1: Shouldn't I employ encoders/decoders from frameless
instead of using kryo
?
Yes, using kryo
is not needed at all. I was actually confused with frameless
API and resorted to kryo
instead of using the code below, which does not depend on kryo
:
val df: DataFrame = ???
TypedDataset.createUnsafe(df)(implicitly[TypedEncoder[T]])
Question2: How can I circumvent the limitation of 22 fields per class?
First, it depends on which Scala version you are using. The limitation involving 22 fields was partially addressed in Scala 2.11 but, in fact, it was only completely addressed in Scala3 compiler with support of Scala 2.13 runtime. I'm compiling with Scala 2.12 because the deployment environment depends on Scala 2.12 runtime.
Second, frameless
have different ways to create a TypedEncoder
for a given case class.
The example below employs TypedEncoder.usingDerivation
and TypedEncoder.usingInjection
, which involve the limitation of 22 fields in tuples behind the scenes, which means that your code may or may not compile, depending on the Scala version you are using.
case class Person(age: Int, name: String, ... ) //pretend that it has more than 22 parameters
object Person {
val tupled = (Person.apply _).tupled
implicit val injection: Injection[Person, (Int, String, ... )] =
Injection(p => unapply(p).get, tupled)
}
// first block
{
import TypedEncoder.usingDerivation
val te: TypedEncoder[Person] = TypedEncoder[Person]
}
// second block
{
import TypedEncoder.usingInjection
val te: TypedEncoder[Person] = TypedEncoder[Person]
}
On the other hand, the code below works for me on Scala 2.12:
case class Person(age: Int, name: String, ... ) //pretend that it has more than 22 parameters
// third block: works with Scala 2.12
{
//REMOVED: import TypedEncoder.usingDerivation
//REMOVED: import TypedEncoder.usingDerivation
val te: TypedEncoder[Person] = TypedEncoder[Person]
}
Thanks @pomadchin for your help clarifying these questions.