Attributes containing `null` are not handled correctly when reading
mauriciojost opened this issue · 2 comments
mauriciojost commented
I write a parquet file where a few records contain null
attribute values. When reading the parquet with frameless
, there are corrupt fields. When reading it using the Spark API, the fields are as originally (contain null
). The snippet to reproduce it by yourself:
// spark 2.3.1 - hadoop 2.7
// ~/opt/zips/spark-2.3.1-bin-hadoop2.7/bin/spark-shell --packages org.typelevel:frameless-dataset_2.11:0.8.0
implicit val s = spark
import spark.implicits._
import frameless.TypedDataset
import frameless.TypedEncoder
val r = new scala.util.Random()
def newpath() = "/tmp/test-" + r.nextLong
case class B(b: String, c: String)
case class C(c: String, d: String)
case class A(a: String, b: List[B], c: C)
// scenario 1, no problem writing, no problem reading with frameless nor with Spark API
val normal = A("a", List(), C("xx", "xx")) // no null values
val path = newpath
TypedDataset.create[A](spark.sparkContext.parallelize[A](Seq(normal))).write.parquet(path)
TypedDataset.createUnsafe[A](spark.read.parquet(path)).rdd.collect.foreach(println)
// A(a,List(),C(xx,xx))
spark.read.parquet(path).as[A].collect.foreach(println)
// A(a,List(),C(xx,xx))
// scenario 2, no problem writing, problem reading with frameless, no problem reading with Spark API
val funull = A("a", List(), C(null, "yy")) // null value in first attribute of a case class instance
val path = newpath
TypedDataset.create[A](spark.sparkContext.parallelize[A](Seq(funull))).write.parquet(path)
TypedDataset.createUnsafe[A](spark.read.parquet(path)).rdd.collect.foreach(println)
// null
spark.read.parquet(path).as[A].collect.foreach(println)
// A(a,List(),C(null,yy))
// scenario 3, no problem writing, big bang problem reading with frameless, no problem reading with Spark API
val fuexce = A("a", List(B(null, "xx")), C("dd", "kk")) // null value in first attribute of a case class instance within a collection
val path = newpath
TypedDataset.create[A](spark.sparkContext.parallelize[A](Seq(fuexce))).write.parquet(path)
TypedDataset.createUnsafe[A](spark.read.parquet(path)).rdd.collect.foreach(println)
// fails horribly with java.null.NullPointerException
// Caused by: java.lang.NullPointerException
// at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.NewInstance_0$// (Unknown Source)
// at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply_0_0$(Unknown Source)
// at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source)
// ...
spark.read.parquet(path).as[A].collect.foreach(println)
// A(a,List(B(null,xx)),C(dd,kk))
If instead I use 0.7.0
the problems are slightly different, but at least no exception is through in the last scenario.
Hope this is clear.
Thanks for the nice project!
mauriciojost commented
mauriciojost commented
What is not clear is why this works while writing, but not while reading.