MrPowers/spark-daria

NotNull assertion expression

Closed this issue · 4 comments

I've found myself wanting to have a assertNotNull(col) expression to be able to fail-fast as well as increase confidence in data structures.

My current situations I'd like to resolve:

  • Joining 2 known matching data sets (therefore every result being matched and not null). (.filter(col.isNotNull) does not change the schema's nullability)
  • CSV reading. A schema may be applied, but nullability is read from the types, not the provided schema. I want to ensure that the columns match the schema's defined nullability (and blow up if not!)

I was using the hack method of spark.createDataFrame(df.rdd, newSchema) which applies the schema and fails when nulls are encountered. But it turns out this stops query optimisation (understandable) and seems to execute an action, which is extremely slow for large data sets.

What I propose is adding an AssertNotNull expression. The following code makes the schema match as well as providing fail-fast when nulls do end up in the processed data set.

case class AssertNotNull(child: Expression) extends UnaryExpression {
  override def nullable: Boolean = false
  override def dataType: DataType = child.dataType
  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = child.genCode(ctx)
  override def sql: String = child.sql // make invisible to schema for ease of use (this follows standard language use of assertions for method contracts - part of a black-box)
}

def assertNotNull(col: Column): Column = new Column(AssertNotNull(col.expr))

df.select(assertNotNull(col("colName"))) // note the assertNotNull function is "invisible" and returns "colName" for it's name

Is this desirable in daria? (It seems to me difficult to test complicated code since schema "mismatches" are easy). Where would it go?

Note that I've just written this and it has passed general testing. I'll be doing performance verification later in the week, but thought the general concept to be worth considering (and vastly superior to createDataFrame workarounds). If nothing else, the resulting conversation can be linked to from stackoverflow ;)

It turns out that while it works, getting a NullPointerException is a rather horrific situation to find your executor in. I've dived in and made updates to doGenCode to generate a code block that will give a nice exception (will update code in the ultimate pull request)

bebe might be a better spot for Catalyst expression stuff. Thanks for opening this.

Rather better is to just use

new Column(org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull(col.expr, Seq(col.toString()))
.as(col.toString) //make it transparent (since we're asserting what's there should be null)

I don't know why I (or the rest of the stack-overflows for that matter) couldn't find it before.
It correctly converts a nullable column to a non-null column in the schema and blows up at runtime.