delta-io/connectors

[Delta Standalone] Incompatibility with Trino Delta Connector

micaelcapitao opened this issue · 11 comments

Runtime

delta-standalone_2.13 0.6.0
hadoop-client 3.3.1
hadoop-aws 3.3.1
Starburst 402.0.0
Spark 3.3.0
java 11

Problem

When using the delta-standalone to extract changes from the delta log, to particular versions of the table, I ran into some situations that result in an error as follows:

shadedelta.com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$DecodingException: Failed to decode field metaData of record: RowParquetRecord (metaData=RowParquetRecord (id=BinaryValue(Binary{36 constant bytes, [57, 52, 49, 48, 101, 100, 101, 50, 45, 54, 49, 101, 99, 45, 52, 55, 50, 98, 45, 57, 56, 57, 54, 45, 55, 51, 102, 99, 56, 53, 52, 102, 102, 97, 51, 98]}),description=BinaryValue(Binary{59 constant bytes, [83, 101, 108, 101, 99, 116, 105, 111, 110, 32, 111, 102, 32, 102, 111, 111, 100, 32, 111, 114, 100, 101, 114, 115, 32, 117, 115, 101, 100, 32, 116, 111, 32, 98, 117, 105, 108, 100, 32, 116, 104, 101, 32, 116, 114, 97, 105, 110, 105, 110, 103, 32, 100, 97, 116, 97, 115, 101, 116]}),format=RowParquetRecord (provider=BinaryValue(Binary{7 constant bytes, [112, 97, 114, 113, 117, 101, 116]}),options=MapParquetRecord ()),schemaString=BinaryValue(Binary{438 constant bytes, [123, 34, 102, 105, 101, 108, 100, 115, 34, 58, 91, 123, 34, 109, 101, 116, 97, 100, 97, 116, 97, 34, 58, 123, 125, 44, 34, 110, 97, 109, 101, 34, 58, 34, 111, 114, 100, 101, 114, 95, 105, 100, 34, 44, 34, 110, 117, 108, 108, 97, 98, 108, 101, 34, 58, 116, 114, 117, 101, 44, 34, 116, 121, 112, 101, 34, 58, 34, 108, 111, 110, 103, 34, 125, 44, 123, 34, 109, 101, 116, 97, 100, 97, 116, 97, 34, 58, 123, 125, 44, 34, 110, 97, 109, 101, 34, 58, 34, 99, 117, 115, 116, 111, 109, 101, 114, 95, 105, 100, 34, 44, 34, 110, 117, 108, 108, 97, 98, 108, 101, 34, 58, 116, 114, 117, 101, 44, 34, 116, 121, 112, 101, 34, 58, 34, 108, 111, 110, 103, 34, 125, 44, 123, 34, 109, 101, 116, 97, 100, 97, 116, 97, 34, 58, 123, 125, 44, 34, 110, 97, 109, 101, 34, 58, 34, 115, 116, 111, 114, 101, 95, 105, 100, 34, 44, 34, 110, 117, 108, 108, 97, 98, 108, 101, 34, 58, 116, 114, 117, 101, 44, 34, 116, 121, 112, 101, 34, 58, 34, 108, 111, 110, 103, 34, 125, 44, 123, 34, 109, 101, 116, 97, 100, 97, 116, 97, 34, 58, 123, 125, 44, 34, 110, 97, 109, 101, 34, 58, 34, 111, 114, 100, 101, 114, 95, 99, 114, 101, 97, 116, 101, 100, 95, 97, 116, 34, 44, 34, 110, 117, 108, 108, 97, 98, 108, 101, 34, 58, 116, 114, 117, 101, 44, 34, 116, 121, 112, 101, 34, 58, 34, 116, 105, 109, 101, 115, 116, 97, 109, 112, 34, 125, 44, 123, 34, 109, 101, 116, 97, 100, 97, 116, 97, 34, 58, 123, 125, 44, 34, 110, 97, 109, 101, 34, 58, 34, 112, 95, 99, 114, 101, 97, 116, 105, 111, 110, 95, 100, 97, 116, 101, 34, 44, 34, 110, 117, 108, 108, 97, 98, 108, 101, 34, 58, 116, 114, 117, 101, 44, 34, 116, 121, 112, 101, 34, 58, 34, 100, 97, 116, 101, 34, 125, 44, 123, 34, 109, 101, 116, 97, 100, 97, 116, 97, 34, 58, 123, 125, 44, 34, 110, 97, 109, 101, 34, 58, 34, 109, 101, 97, 108, 95, 116, 105, 109, 101, 34, 44, 34, 110, 117, 108, 108, 97, 98, 108, 101, 34, 58, 116, 114, 117, 101, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 105, 110, 103, 34, 125, 93, 44, 34, 116, 121, 112, 101, 34, 58, 34, 115, 116, 114, 117, 99, 116, 34, 125]}),partitionColumns=ListParquetRecord (RowParquetRecord (array_element=BinaryValue(Binary{15 constant bytes, [112, 95, 99, 114, 101, 97, 116, 105, 111, 110, 95, 100, 97, 116, 101]}))),configuration=MapParquetRecord (),createdTime=LongValue(1674827858489)))
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$DecodingException$.apply(ParquetRecordDecoder.scala:32) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$$anon$2.decode(ParquetRecordDecoder.scala:61) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$$anon$2.decode(ParquetRecordDecoder.scala:54) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$$anon$2.decode(ParquetRecordDecoder.scala:63) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$$anon$2.decode(ParquetRecordDecoder.scala:54) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$$anon$2.decode(ParquetRecordDecoder.scala:63) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$$anon$2.decode(ParquetRecordDecoder.scala:54) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$$anon$2.decode(ParquetRecordDecoder.scala:63) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$$anon$2.decode(ParquetRecordDecoder.scala:54) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$$anon$3.decode(ParquetRecordDecoder.scala:73) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetRecordDecoder$.decode(ParquetRecordDecoder.scala:43) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetIterableImpl$$anon$3.$anonfun$hasNext$2(ParquetReader.scala:144) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at scala.Function2.$anonfun$curried$2(Function2.scala:44) ~[scala-library-2.13.8.jar:?]
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetIterableImpl$$anon$3.$anonfun$hasNext$1(ParquetReader.scala:144) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at scala.Option.map(Option.scala:242) ~[scala-library-2.13.8.jar:?]
	at shadedelta.com.github.mjakubowski84.parquet4s.ParquetIterableImpl$$anon$3.hasNext(ParquetReader.scala:144) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.actions.CustomParquetIterator.hasNext(MemoryOptimizedLogReplay.scala:132) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.actions.MemoryOptimizedLogReplay$$anon$1.$anonfun$ensureNextIterIsReady$3(MemoryOptimizedLogReplay.scala:81) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.actions.MemoryOptimizedLogReplay$$anon$1.$anonfun$ensureNextIterIsReady$3$adapted(MemoryOptimizedLogReplay.scala:81) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at scala.Option.exists(Option.scala:406) ~[scala-library-2.13.8.jar:?]
	at io.delta.standalone.internal.actions.MemoryOptimizedLogReplay$$anon$1.ensureNextIterIsReady(MemoryOptimizedLogReplay.scala:81) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.actions.MemoryOptimizedLogReplay$$anon$1.hasNext(MemoryOptimizedLogReplay.scala:90) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at scala.collection.convert.JavaCollectionWrappers$JIteratorWrapper.hasNext(JavaCollectionWrappers.scala:37) ~[scala-library-2.13.8.jar:?]
	at scala.collection.IterableOnceOps.foreach(IterableOnce.scala:563) ~[scala-library-2.13.8.jar:?]
	at scala.collection.IterableOnceOps.foreach$(IterableOnce.scala:561) ~[scala-library-2.13.8.jar:?]
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1293) ~[scala-library-2.13.8.jar:?]
	at io.delta.standalone.internal.SnapshotImpl.loadTableProtocolAndMetadata(SnapshotImpl.scala:141) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.SnapshotImpl.x$1$lzycompute(SnapshotImpl.scala:131) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.SnapshotImpl.x$1(SnapshotImpl.scala:131) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.SnapshotImpl.protocolScala$lzycompute(SnapshotImpl.scala:131) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.SnapshotImpl.protocolScala(SnapshotImpl.scala:131) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.SnapshotImpl.<init>(SnapshotImpl.scala:272) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.SnapshotManagement.createSnapshot(SnapshotManagement.scala:257) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.SnapshotManagement.getSnapshotAt(SnapshotManagement.scala:245) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.SnapshotManagement.getSnapshotForVersionAsOf(SnapshotManagement.scala:53) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.SnapshotManagement.getSnapshotForVersionAsOf$(SnapshotManagement.scala:51) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.DeltaLogImpl.getSnapshotForVersionAsOf(DeltaLogImpl.scala:42) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at io.delta.standalone.internal.DeltaLogImpl.getSnapshotForVersionAsOf(DeltaLogImpl.scala:42) ~[delta-standalone_2.13-0.6.0.jar:0.6.0]
	at com.glovoapp.data_platform.declarative.data_change.delta.DeltaTxInspector.inspectVersion(DeltaTxInspector.java:52) ~[classes/:?]
	at com.glovoapp.data_platform.declarative.data_change.handler.DeltaChangeHandler.handleRequest(DeltaChangeHandler.java:52) ~[classes/:?]
	at com.glovoapp.data_platform.declarative.data_change.app.DeltaChangeApp.main(DeltaChangeApp.java:18) ~[classes/:?]
	at com.glovoapp.data_platform.declarative.data_change.app.DeltaChangeAppWrapper.main(DeltaChangeAppWrapper.java:13) ~[test-classes/:?]

In my code I'm doing something like this:

var deltaLog = DeltaLog.forTable(conf, <table_path>);
var snapshot = deltaLog.getSnapshotForVersionAsOf(<version>)

The issue happens with delta tables created and written by Starburst (Trino).
From my tests, this only starts happening once the checkpoint.parquet files start existing in the log (which makes sense from the error).
In some tables, once a log parquet file is there, I'm unable to get a snapshot from any version; in others I'm able to get the versions before the parquet file.

Reading these tables in Spark using spark.read.format("delta").option("versionAsOf", <version>) works normally. I'm also able to get the details and history of the tables without any issues.

Because both Spark and Starburst (Trino) deal with these tables without any issue, I'm wondering whether the issue is in the delta-standalone itself and not on the Trino side.

Thanks for making this issue! Any chance you could give us some sample data and code that we could use as a reproducible example?

Can you also show the metadata json? I'd like to see the schema, which is the most likely culprit of this error.

Hi,
Thanks for the reply and I'm sorry for the delay. I'll be getting you an example later today.

While producing the test dataset to share with you I noticed that the problem seems related somehow with partitioned tables. In any case the schema of the table is really simple:

>>> spark.read.format("delta").load("/root/data/countries-d34c1824308e428dbeda7aca74c4aa8f").printSchema()
root
 |-- city_code: string (nullable = true)
 |-- country_code: string (nullable = true)
 |-- time_zone: string (nullable = true)
 |-- p_end_date: string (nullable = true)

You can test this with the following data:

countries-d34c1824308e428dbeda7aca74c4aa8f.zip

Code snippet using the Delta Standalone:

import io.delta.standalone.DeltaLog;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

...

var tablePath = new Path("file:///<path_to>/countries-d34c1824308e428dbeda7aca74c4aa8f");
var deltaLog = DeltaLog.forTable(new Configuration(), tablePath);

// Time travelling to versions before the checkpoint usually seems to work.
var olderSnapshot = deltaLog.getSnapshotForVersionAsOf(9);
System.out.println(olderSnapshot.getAllFiles());

// The last version happens to trigger the issue.
var lastSnapshot = deltaLog.snapshot();
System.out.println(lastSnapshot.getAllFiles());

Hi @scottsand-db,
Have you had the chance to check if you could reproduce the issue?
Thanks!

Looking at the parquet schemas I see this difference that looks like the trigger of the issue:

message spark_schema {
  optional group metaData {
    ...

    -- Spark
    optional group partitionColumns (LIST) {
      repeated group list {
        optional binary element (STRING);
      }
    }

    -- Starburst
    optional group partitionColumns (LIST) {
      repeated group bag {
        optional binary array_element (STRING);
      }
    }
  
    ...
  }
}

Looking at the code for com.github.mjakubowski84:parquet4s-core:1.9.4 I can see the place where a case is missing.
In the more recent versions they have an extra case that seems it could handle this situation. Though I'm not sure the overall decoded parquet would be understood by delta standalone.

I'm going to attempt a version upgrade on this lib and see how it goes.

@micaelcapitao by Spark above, do you mean delta-standalone?

By Spark above I mean the parquet schema Delta writes when running on Spark. So "pure" Delta+Spark writes a list whereas Trino (Starburst) writes a bag. Delta+Spark have no issues dealing with that, but the same cannot be said of delta-standalone. I'm not sure how strict is/should be the Delta protocol on this matter though.

Upgrading the version of parquet4s-core to 2.10.0 doesn't solve the issue (I'm keeping that upgrade though in case the community might be interested).
The problem seems related to the way parquet4s infers the read parquet schema based on the case class of Metadata. I'm not familiar with this so I'm trying to figure out how to make it more permissive at read time. I'm currently playing with the Parquet4sWrapper, creating one for Metadata, and see if I can get anywhere.

Hi,
Just to let you know that my attempts didn't go anywhere. In the meantime I'm gonna use a full blown Spark running in local mode for my use case.
I'm also opening an issue in Starburst as at this moment, this is starting to look like a problem on their side instead. Also this should affect Trino equally as the original code came from Starburst.

For anyone facing the same issue, this seems to have been solved in this PR trinodb/trino#16172 on the Trino side. Testing it on Starburst 410e seems to be working.