delta-io/delta

[BUG] Column invariant not enforced on write

wjones127 opened this issue · 10 comments

Bug

Describe the problem

I'm trying to understand the column invariant enforcement in delta lake, so it can implement it in delta-rs. However, I am unable to get PySpark to throw an error when writing values that violate the invariant. Am I misunderstanding the spec? Or is this a bug?

Steps to reproduce

import pyarrow as pa
import pyspark
import pyspark.sql.types
import pyspark.sql.functions as F
import delta
from delta.tables import DeltaTable

def get_spark():
    builder = (
        pyspark.sql.SparkSession.builder.appName("MyApp")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config(
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        )
    )
    return delta.configure_spark_with_delta_pip(builder).getOrCreate()

spark = get_spark()

schema = pyspark.sql.types.StructType([
    pyspark.sql.types.StructField(
        "c1", 
        dataType = pyspark.sql.types.IntegerType(), 
        nullable = False, 
        metadata = { "delta.invariants": "c1 > 3" } 
    )
])

table = DeltaTable.create(spark) \
    .tableName("testTable") \
    .addColumns(schema) \
    .execute()

# This should fail, but doesn't
spark.createDataFrame([(2,)], schema=schema).write.saveAsTable(
    "testTable",
    mode="append",
    format="delta",
)

Observed results

The write succeeds, even though the delta.invariants key is clearly in schema, the writer protocol is set to 2, and the min value of the write clearly violates the invariant.

First log file:

{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"e8204eae-cd90-41c2-b685-92f22126b54a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"c1\",\"type\":\"integer\",\"nullable\":false,\"metadata\":{\"delta.invariants\":\"c1 > 3\"}}]}","partitionColumns":[],"configuration":{},"createdTime":1656459957813}}
{"commitInfo":{"timestamp":1656459957820,"operation":"CREATE TABLE","operationParameters":{"isManaged":"true","description":null,"partitionBy":"[]","properties":"{}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/3.2.1 Delta-Lake/1.2.1","txnId":"6d370f8e-211f-4624-8a40-6fbd67e905c8"}}

Second log:

{"add":{"path":"part-00000-0d61b29d-60ee-47d1-a121-2641fbc3ae1d-c000.snappy.parquet","partitionValues":{},"size":326,"modificationTime":1656459958951,"dataChange":true,"stats":"{\"numRecords\":0,\"minValues\":{},\"maxValues\":{},\"nullCount\":{}}"}}
{"add":{"path":"part-00003-b30e416e-c616-4d80-87b6-182baf8f0830-c000.snappy.parquet","partitionValues":{},"size":479,"modificationTime":1656459958981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"c1\":2},\"maxValues\":{\"c1\":2},\"nullCount\":{\"c1\":0}}"}}
{"commitInfo":{"timestamp":1656459958996,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"2","numOutputRows":"1","numOutputBytes":"805"},"engineInfo":"Apache-Spark/3.2.1 Delta-Lake/1.2.1","txnId":"00a036ec-243d-4543-b7d2-186f031ca2f1"}}

Expected results

I expected it to throw an exception. This should be identical to this unit test, right?

testQuietly("reject expression invariant on top level column") {
val expr = "value < 3"
val rule = Constraints.Check("", spark.sessionState.sqlParser.parseExpression(expr))
val metadata = new MetadataBuilder()
.putString(Invariants.INVARIANTS_FIELD, PersistedExpression(expr).json)
.build()
val schema = new StructType()
.add("key", StringType)
.add("value", IntegerType, nullable = true, metadata)
testBatchWriteRejection(
rule,
schema,
Seq[(String, Int)](("a", 1), (null, 5)).toDF("key", "value"),
"value", "5"
)

Further details

Environment information

  • Delta Lake version: 1.2.1
  • Spark version: 3.2.1
  • Scala version:

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

I have no experience with Scala, so if this is a bug I may not be able to fix it. But I'd be happy to add further clarification to the Protocol spec to clear up the expectations around delta.invariants.

Looking at the invariant enforcement code here, it looks like if the column is not nullable, the invariant by delta.invariants is not considered at all. The existing test has nullable=true and test you written has nullable=false.

I am not sure if it is a bug. Will spend sometime understanding the protocol and get back to you.

Oh very interesting. If I switch it to nullable=True, it then rejects the SQL string provided. So I must also not be formatting the condition correctly, though I don't see any good descriptions anywhere. Any hints?

Traceback (most recent call last):
  File "<stdin>", line 4, in <module>
  File "/home/wjones/delta-rs/python/venv/lib/python3.10/site-packages/delta/tables.py", line 1146, in execute
    jdt = self._jbuilder.execute()
  File "/home/wjones/delta-rs/python/venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/home/wjones/delta-rs/python/venv/lib/python3.10/site-packages/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/home/wjones/delta-rs/python/venv/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o50.execute.
: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'c1': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (String)"c1 > 3"; line: 1, column: 3]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:2903)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1949)
        at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:781)
        at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4684)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3548)
        at com.fasterxml.jackson.module.scala.ScalaObjectMapper.readValue(ScalaObjectMapper.scala:206)
        at com.fasterxml.jackson.module.scala.ScalaObjectMapper.readValue$(ScalaObjectMapper.scala:205)
        at org.apache.spark.sql.delta.util.JsonUtils$$anon$1.readValue(JsonUtils.scala:27)
        at org.apache.spark.sql.delta.constraints.Invariants$.$anonfun$getFromSchema$2(Invariants.scala:81)
        at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
        at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:75)
        at scala.collection.TraversableLike.map(TraversableLike.scala:286)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
        at scala.collection.AbstractTraversable.map(Traversable.scala:108)
        at org.apache.spark.sql.delta.constraints.Invariants$.getFromSchema(Invariants.scala:76)
        at org.apache.spark.sql.delta.actions.Protocol$.requiredMinimumProtocol(actions.scala:132)
        at org.apache.spark.sql.delta.actions.Protocol$.$anonfun$apply$1(actions.scala:100)
        at scala.Option.map(Option.scala:230)
        at org.apache.spark.sql.delta.actions.Protocol$.apply(actions.scala:100)
        at org.apache.spark.sql.delta.actions.Protocol$.forNewTable(actions.scala:119)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadataInternal(OptimisticTransaction.scala:326)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadataInternal$(OptimisticTransaction.scala:290)
        at org.apache.spark.sql.delta.OptimisticTransaction.updateMetadataInternal(OptimisticTransaction.scala:98)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata(OptimisticTransaction.scala:283)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadata$(OptimisticTransaction.scala:278)
        at org.apache.spark.sql.delta.OptimisticTransaction.updateMetadata(OptimisticTransaction.scala:98)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadataForNewTable(OptimisticTransaction.scala:391)
        at org.apache.spark.sql.delta.OptimisticTransactionImpl.updateMetadataForNewTable$(OptimisticTransaction.scala:389)
        at org.apache.spark.sql.delta.OptimisticTransaction.updateMetadataForNewTable(OptimisticTransaction.scala:98)
        at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.createTransactionLogOrVerify$1(CreateDeltaTableCommand.scala:181)
        at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.$anonfun$run$2(CreateDeltaTableCommand.scala:193)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:120)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:118)
        at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.recordFrameProfile(CreateDeltaTableCommand.scala:49)
        at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperation$5(DeltaLogging.scala:114)
        at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
        at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
        at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.recordOperation(CreateDeltaTableCommand.scala:49)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:113)
        at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:98)
        at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.recordDeltaOperation(CreateDeltaTableCommand.scala:49)
        at org.apache.spark.sql.delta.commands.CreateDeltaTableCommand.run(CreateDeltaTableCommand.scala:110)
        at org.apache.spark.sql.delta.catalog.DeltaCatalog.org$apache$spark$sql$delta$catalog$DeltaCatalog$$createDeltaTable(DeltaCatalog.scala:164)
        at org.apache.spark.sql.delta.catalog.DeltaCatalog.createTable(DeltaCatalog.scala:213)
        at org.apache.spark.sql.execution.datasources.v2.CreateTableExec.run(CreateTableExec.scala:42)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:133)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:130)
        at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:148)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:166)
        at org.apache.spark.sql.execution.QueryExecution.withCteMap(QueryExecution.scala:73)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:163)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:163)
        at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:214)
        at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:259)
        at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:228)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at io.delta.tables.DeltaTableBuilder.execute(DeltaTableBuilder.scala:357)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:829)

The expression needs to be in JSON format. In the example test this is the input {"delta.invariants":"{\"expression\":{\"expression\":\"value < 3\"}}"}

Ah yes I can confirm that works.

And I think the fact that it doesn't enforce these on non-nullable columns seems like a bug; I can't imagine why you would want to selectively enforce.

Updated example
import pyarrow as pa
import pyspark
import pyspark.sql.types
import pyspark.sql.functions as F
import delta
from delta.tables import DeltaTable

def get_spark():
    builder = (
        pyspark.sql.SparkSession.builder.appName("MyApp")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config(
            "spark.sql.catalog.spark_catalog",
            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
        )
    )
    return delta.configure_spark_with_delta_pip(builder).getOrCreate()

spark = get_spark()

schema = pyspark.sql.types.StructType([
    pyspark.sql.types.StructField(
        "c1", 
        dataType = pyspark.sql.types.IntegerType(), 
        nullable = True, 
        metadata = { "delta.invariants": "{\"expression\": { \"expression\": \"c1 > 3\"} }" }
    )
])

table = DeltaTable.create(spark) \
    .tableName("testTable") \
    .addColumns(schema) \
    .execute()

# This now fails
spark.createDataFrame([(2,)], schema=schema).write.saveAsTable(
    "testTable",
    mode="append",
    format="delta",
)

@wjones127 Heard from @zsxwing that this feature has bugs and is being deprecated in favor of the constraints.

Closing this issue. @wjones127.

@vkorukanti I don't consider this closed. It seems to me that to consider this closed, we have to do one of the following

  1. Fix the bug in Spark implementation of Delta Lake
  2. Remove the invariants feature from the Delta Lake Protocol
  3. Make clear in the Delta Lake protocol that enforcing invariants is optional and deprecated for writer protocol V2, citing Spark as the precedent.

I'm fine with any of these options, but I don't think we should leave this as a lingering issue. As is, an implementor of a Delta Lake writer could be left with the impression that all other writers will respect invariants and create an API to configure them.

@wjones127 Makes sense. Reopening. Not sure what is the procedure to deprecate the features. @tdas?

@tdas gentle ping.

It sounds like Spark does indeed still use invariants for NOT NULL columns, based on this conversation: https://delta-users.slack.com/archives/CJ70UCSHM/p1693348327628349

So maybe instead of deprecated the use case should be narrowed to that one? I don't know the Spark implementation well enough to know what the proper way forward here is.

It seems to me that INVARIANTS now only represents "NOT NULL" constraint, but nothing else?