typelevel/frameless

How should parse and convert data from an external medium in a generic way?

frgomes opened this issue · 2 comments

I would like to create a generic TypedDataset[T] where T is generic.

For more context on the topic, I would like to point to a similar question on StackOverflow which gave me some ground for experimentation:
https://stackoverflow.com/questions/55637342/implicit-encoder-for-typeddataset-and-type-bounds-in-scala

After a lot of experimentation, I've finally got to a case class which describes which defines a read method to be executed later:

import scala.reflect.ClassTag
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset
import frameless.TypedDataset
import frameless.TypedEncoder

case class Descriptor[T <: Product : TypedEncoder : ClassTag](label: String, source: String, options: Map[String, String], format: String) {
  def read(implicit spark: SparkSession): Try[TypedDataset[T]] = Try {
    import org.apache.spark.sql.Encoders
    val df: DataFrame = raw.read(source, options, format)(spark)
    val ds: Dataset[T] = df.as[T](Encoders.kryo[T])
    TypedDataset.create(ds)
  }
}
object Descriptor {
  def apply[T <: Product : TypedEncoder : ClassTag]
           (label: String, source: String, options: Map[String, String], format: String,
            pf: PathFormatter, temporal: java.time.temporal.Temporal, table: Option[String]): Descriptor[T] = {
    val args: Map[String, Any] = Map("source" -> source, "entity" -> table.getOrElse(label), "temporal" -> temporal)
    pf.format(args) match {
      case Success(source) => Descriptor[T](label, source, options, format)
      case Failure(t) => throw t
    }
  }
}

Then, I've defined a test case which reads a CSV file without schema inference, then converts fields to proper data types, parsing the data, converts the data as appropriate and finally returns a TypedDataset[T] like this:

  import org.apache.spark.sql.SparkSession

  val options = Map("delimiter" -> ",", "header" -> "true", "inferSchema" -> "false") // MUST NOT INFER SCHEMA!

  def reader(implicit spark: SparkSession): Try[TypedDataset[Person]] = {
    import TypedEncoder.usingDerivation
    val encoderPerson: TypedEncoder[Person] = TypedEncoder[Person]
    val descPerson = Descriptor[Person]("person", "/path/to/person/data.csv", options, "csv")
    val tdsPerson: Try[TypedDataset[Person]] = descPerson.read(spark)
    tdsPerson
  }

This is how I've defined the schema for Person:

case class Person(age: Int, name: String)
object Person {
  import frameless.Injection
  val tupled = (Person.apply _).tupled
  implicit val injection: Injection[Person, (Int, String)] =
    Injection(p => unapply(p).get, tupled)
}

Question1: Regarding encoders and decoders

A key step in all this logic above is done by this line of code below, part of read method:

val ds: Dataset[T] = df.as[T](Encoders.kryo[T])

Employing Kryo in Spark, configuring it and registering encoders/decoders are out of topic here. The key point is:

Does it make sense to use Kryo? Shouldn't I employ encoders/decoders from frameless instead?

Am I missing something?

Question 2: Limitations

The case class Person above works fine for a HelloWorld application. However, in real life, a schema may have several dozens of fields, which hits the limitation of 22 fields in Scala 2.12. Any ideas apart from employing Scala 2.13?

Thanks

Answering my own questions:

Question1: Shouldn't I employ encoders/decoders from frameless instead of using kryo?

Yes, using kryo is not needed at all. I was actually confused with frameless API and resorted to kryo instead of using the code below, which does not depend on kryo:

val df: DataFrame = ???
TypedDataset.createUnsafe(df)(implicitly[TypedEncoder[T]])

Question2: How can I circumvent the limitation of 22 fields per class?

First, it depends on which Scala version you are using. The limitation involving 22 fields was partially addressed in Scala 2.11 but, in fact, it was only completely addressed in Scala3 compiler with support of Scala 2.13 runtime. I'm compiling with Scala 2.12 because the deployment environment depends on Scala 2.12 runtime.

Second, frameless have different ways to create a TypedEncoder for a given case class.

The example below employs TypedEncoder.usingDerivation and TypedEncoder.usingInjection, which involve the limitation of 22 fields in tuples behind the scenes, which means that your code may or may not compile, depending on the Scala version you are using.

case class Person(age: Int, name: String, ... ) //pretend that it has more than 22 parameters
object Person {
  val tupled = (Person.apply _).tupled
  implicit val injection: Injection[Person, (Int, String, ... )] =
    Injection(p => unapply(p).get, tupled)
}

// first block
{
  import TypedEncoder.usingDerivation
  val te: TypedEncoder[Person] = TypedEncoder[Person]
}

// second block
{
  import TypedEncoder.usingInjection
  val te: TypedEncoder[Person] = TypedEncoder[Person]
}

On the other hand, the code below works for me on Scala 2.12:

case class Person(age: Int, name: String, ... ) //pretend that it has more than 22 parameters

// third block: works with Scala 2.12
{
  //REMOVED: import TypedEncoder.usingDerivation
  //REMOVED: import TypedEncoder.usingDerivation
  val te: TypedEncoder[Person] = TypedEncoder[Person]
}

Thanks @pomadchin for your help clarifying these questions.

I think this question has been resolved for now, but please don't hesitate to reopen it / open a new issue @frgomes!