typelevel/frameless

Attributes containing `null` are not handled correctly when reading

mauriciojost opened this issue · 2 comments

I write a parquet file where a few records contain null attribute values. When reading the parquet with frameless, there are corrupt fields. When reading it using the Spark API, the fields are as originally (contain null). The snippet to reproduce it by yourself:

// spark 2.3.1 - hadoop 2.7
// ~/opt/zips/spark-2.3.1-bin-hadoop2.7/bin/spark-shell --packages org.typelevel:frameless-dataset_2.11:0.8.0
  
implicit val s = spark
import spark.implicits._
import frameless.TypedDataset
import frameless.TypedEncoder

val r = new scala.util.Random()
def newpath() = "/tmp/test-" + r.nextLong
  
case class B(b: String, c: String)
case class C(c: String, d: String)
case class A(a: String, b: List[B], c: C)
  
// scenario 1, no problem writing, no problem reading with frameless nor with Spark API
val normal = A("a", List(), C("xx", "xx")) // no null values
val path = newpath
TypedDataset.create[A](spark.sparkContext.parallelize[A](Seq(normal))).write.parquet(path)
TypedDataset.createUnsafe[A](spark.read.parquet(path)).rdd.collect.foreach(println)
// A(a,List(),C(xx,xx))
spark.read.parquet(path).as[A].collect.foreach(println)
// A(a,List(),C(xx,xx))

// scenario 2, no problem writing, problem reading with frameless, no problem reading with Spark API
val funull = A("a", List(), C(null, "yy")) // null value in first attribute of a case class instance
val path = newpath
TypedDataset.create[A](spark.sparkContext.parallelize[A](Seq(funull))).write.parquet(path)
TypedDataset.createUnsafe[A](spark.read.parquet(path)).rdd.collect.foreach(println)
// null
spark.read.parquet(path).as[A].collect.foreach(println)
// A(a,List(),C(null,yy))

// scenario 3, no problem writing, big bang problem reading with frameless, no problem reading with Spark API
val fuexce = A("a", List(B(null, "xx")), C("dd", "kk")) // null value in first attribute of a case class instance within a collection
val path = newpath
TypedDataset.create[A](spark.sparkContext.parallelize[A](Seq(fuexce))).write.parquet(path)
TypedDataset.createUnsafe[A](spark.read.parquet(path)).rdd.collect.foreach(println)
// fails horribly with java.null.NullPointerException
// Caused by: java.lang.NullPointerException
//   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.NewInstance_0$// (Unknown Source)
//   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply_0_0$(Unknown Source)
//   at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source)
// ...
spark.read.parquet(path).as[A].collect.foreach(println)
// A(a,List(B(null,xx)),C(dd,kk))

If instead I use 0.7.0 the problems are slightly different, but at least no exception is through in the last scenario.

Hope this is clear.

Thanks for the nice project!

image
As per discussion in Gitter.

What is not clear is why this works while writing, but not while reading.