typelevel/frameless

How to use it with Java POJOs

vinkaga opened this issue · 8 comments

Sorry, if the answer is already covered. I haven't succeeded based on the documentation so far.

I am reading AVRO encoded data into Spark which is represented by a hierarchy of Java POJOs with a single top level object. Reading into Spark generates Dataset[Row] output. I am trying to convert it into TypedDataset[MyTopPojo] as follows

    val dsr: Dataset[Row] = session.read.format("avro").load("../00000.avro")
    val typed: TypedDataset[MyTopPojo] = TypedDataset.create(dsr)

But that generates error

type mismatch;
 found   : org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
 required: org.apache.spark.sql.Dataset[MyTopPojo]

What do I need to do to create TypedDataset?

Hey @vinkaga! You'd need to convert Row into the MyTopPojo yourself. This is not the best way to do it but you can use Spark machinery to handle it:

// haven't tested this code though
// can be dervied via the Spark inbuilt machinery as well
val encoder: ExpressionEncoder[MyTopPojo] = TypedExpressionEncoder[T].asInstanceOf[ExpressionEncoder[T]]

// be careful with these functions though, they are pretty expensive
// serializer Row => InternalRow
lazy val toInternalRow = RowEncoder(encoder.schema).createSerializer()
// deserializer InternalRow => MyTopPojo
lazy val fromInternalRow = encoder.createDeserializer()

val dsr: Dataset[MyTopPojo] = 
  session.read.format("avro").load("../00000.avro").map(row => deserializer(serializer(row).copy()))
val typed: TypedDataset[MyTopPojo] = TypedDataset.create(dsr)

Here's what I tried

val encoder: ExpressionEncoder[MyTopPojo] = ExpressionEncoder[MyTopPojo]
lazy val serializer = RowEncoder(encoder.schema).createSerializer()
lazy val deserializer = encoder.createDeserializer()
val dsr: Dataset[MyTopPojo] = spark.read.format("avro")
    .load("s3://...")
    .map(row => deserializer(serializer(row).copy()))
val typed: TypedDataset[MyTopPojo] = TypedDataset.create(dsr)

I got error on map line

No implicit arguments of type: Encoder[MyTopPojo]

And on the last line

No implicit arguments of type: TypedEncoder[MyTopPojo]

@vinkaga oh that is PoJo not a case class, reading bad. You would need to manually define the TypedEncoder for your type.

I'm using this helper to deal with it (actually mb it is worth moving it into frameless).

Usage example:

public class MyTopPojo {

    public String name;
    public int age;

    public MyTopPojo(String name, int age) {
        this.name = name;
        this.age = age;
    }
}
implicit val myTopPojoTypedEncoder: TypedEncoder[MyTopPojo] =
  ManualTypedEncoder.newInstance[MyTopPojo](
    fields = List(
      RecordEncoderField(0, "name", TypedEncoder[String]),
      RecordEncoderField(1, "age", TypedEncoder[Int])
    )
  )

@pomadchin, MyTopPojo is several levels deep hierarchical Pojo. Is it possible to have the encoder be built without extensive manual work?

@vinkaga I actually noticed in your example:

implicit val encoder: ExpressionEncoder[MyTopPojo] = ExpressionEncoder[MyTopPojo]
lazy val serializer = RowEncoder(encoder.schema).createSerializer()
lazy val deserializer = encoder.createDeserializer()
val dsr: Dataset[MyTopPojo] = spark.read.format("avro")
    .load("s3://...")
    .map(row => deserializer(serializer(row).copy()))

@pomadchin, with that, I am still getting the errors. Not sure how to proceed further.

hey @vinkaga I will find time to try smth out later this week, however, to make a more general solution I think it is possible to do a semiautomatic derivation of TypedEncoders for pojo classes via https://github.com/limansky/beanpuree

I think this can be closed for now.