delta-io/delta-rs

Write `enforce_invariant()` function

wjones127 opened this issue · 3 comments

Description

For both the datafusion and pyarrow-based writers to support writer protocol v2, we'll need to support enforcing invariants. It seems like the following signature could be reused by both implementations:

fn enforce_invariant(batch: RecordBatch, invariants: &Vec<(i32, &str)>) -> Result<(), DatafusionError> {
    // rough implementation:
    for (column_index, sql_invariant) in invariants {
        // ... (run data fusion query)
        // ... If failure, return error indicating which invariant failed.
    }
    Ok(())
}

Then this function could be applied to each record batch that comes in during a write.

We might also need to check whether the column is nullable and make sure we are enforcing that too, either as part of this or part of the schema enforcement. Should add a test for that.

What do you think @roeap?

Related Issue(s)

Related docs

https://github.com/delta-io/delta/blob/master/PROTOCOL.md#column-invariants
https://books.japila.pl/delta-lake-internals/constraints/Invariants/

BTW here are the invariant test suites: https://github.com/delta-io/delta/blob/master/core/src/test/scala/org/apache/spark/sql/delta/schema/InvariantEnforcementSuite.scala

Also note "invariants" are distinct from "constraints", so there's a separate test suite for that. Constraint enforcement is part of writer version 3, so don't need to worry about it yet; But it seems like it has a very similar implementation as invariants, so maybe write this function with constraints in mind.

If I understand correctly, the main difference between invariants and constraints is that the former is a property of a single column, whereas a constraint is a property of the table and thus can enforce relationships between columns (example constraint).

roeap commented

At first sight this sounds very reasonable :). With regards to testing, maybe it makes sense to combine that effort with the pyspark integration tests, along the lines of do we get the same results (error or not) when writing here or with pyspark?

Two notes:

  • We are currently waiting on clarification to the protocol: delta-io/delta#1241
  • While we should enforce column invariants to be compliant, we should not be eager to expose APIs to add column invariants to tables. The existing Spark implementation seems to have a bug where the invariants are only enforced when the column is nullable (delta-io/delta#1239). Regardless, I think similar functionality can be done through Constraints and that's likely better supported in the existing ecosystem.