delta-io/connectors

Improve deterministic guarantees implementations, or at least update API docs

scottsand-db opened this issue · 24 comments

  • DeltaLog.getChanges - are the actions inside a given VersionLog in determinist order? If not, can we make them so?
  • Snapshot.getAllFiles - are the AddFiles in deterministic order? If not, can we make them so?
  • DeltaScan.getFiles - are the AddFiles in deterministic order? If not, can we make them so?
  • DeltaLog.getChanges: Yes.
  • Snapshot.getAllFiles: No. We don't want to add the guarantee so that we can optimize how to read transaction logs.
  • DeltaScan.getFiles: No. We don't want to add the guarantee so that we can optimize how to read transaction logs.

@zsxwing if it could benefit connectors ... is it worth adding APIs, or adding optional parameters, that if set ensure that we do have some sort of order guarantee?

e.g. reading deterministic actions through an iterator could be useful for failure recovery for the Flink Source connector.

e.g. reading deterministic actions through an iterator could be useful for failure recovery for the Flink Source connector.

This means we need to sort by something before returning the result. It means we would need to load all actions to memory.

Fair. Can't we depend on the order guarantees of the other APIs we use? e.g. if listFiles returns them in order ... and we read each action in the .json file line by line ... shouldn't that guarantee deterministic order?

if listFiles returns them in order ... and we read each action in the .json file line by line ... shouldn't that guarantee deterministic order?

For example, a snapshot version 10 can be loaded from:

  1. 10.checkpoint
  2. 0.json, 1.json, ..., 10.json.

We won't be able to know the original order of files in the checkpoint when they were in 0.json, 1.json, ..., 10.json.

Makes sense. Thanks

if listFiles returns them in order ... and we read each action in the .json file line by line ... shouldn't that guarantee deterministic order?

For example, a snapshot version 10 can be loaded from:

  1. 10.checkpoint
  2. 0.json, 1.json, ..., 10.json.

We won't be able to know the original order of files in the checkpoint when they were in 0.json, 1.json, ..., 10.json.

Can we make the checkpoint content files order. The current checkPoint content = last checkPoint + recent transaction logs.

@horizonzy - when we create the checkpoint, we use snapshot.allFilesScala (see Checkpoints.scala).
To generate snapshot.allFilesScala, we perform an in-memory-log-replay, where we keep track of the AddFiles seen so far using a hashMap. To create the allFilesScala, we get an iterable from that hash map. Thus, ordering is not guaranteed.

So I don't see a way to make the checkpoint contain files in order.

Change (transactions,activeFiles,tombstones) type to LinkedHashMap in InMemoryLogReplay may make it.

@horizonzy - perhaps. But this is only one delta client. We don't know who wrote the previous json files or checkpoints.

If another delta client wrote the previous checkpoint, and mixed up the order of files, doesn't that defeat the purpose?

It doesn't matter, we just keep the order to be consistent with the order in the json file, rather unpredictable order.

We cannot enforce any ordering on the checkpoint files. A checkpoint may contain multiple files, and it's up to the writer to decide how to split a checkpoint.

A checkpoint may contain multiple files, and it's up to the writer to decide how to split a checkpoint.

The chenkpoint content is from transactions, we just add all the actions from the first transaction's first action to the last transaction ac's last action.

The chenkpoint content is from transactions, we just add all the actions from the first transaction's first action to the last transaction ac's last action.

The current (protocol)[https://github.com/delta-io/delta/blob/master/PROTOCOL.md] says nothing about the order. And there is no order guarantee in the current checkpoint writer implementation. We cannot add the order requirement when the checkpoints in existing tables cannot satisfy the requirement.

If we want to provide an API in Delta Standalone, we have to sort actions in some deterministic order.

We cannot add the order requirement when the checkpoints in existing tables cannot satisfy the requirement.

Yes, the old data is difficult to compatible.

If we want to provide an API in Delta Standalone, we have to sort actions in some deterministic order.

We follow the order of the content order in transactions.

Example:

00000000000000000000.json

{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"6ce78fa5-c24a-41c2-a350-10ffd40004c7","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1662709324810}}
{"add":{"path":"part-00000-bfa47503-f267-422d-b495-6bf8f775c3f3-c000.snappy.parquet","partitionValues":{},"size":296,"modificationTime":1662709326234,"dataChange":true,"stats":"{\"numRecords\":0,\"minValues\":{},\"maxValues\":{},\"nullCount\":{}}"}}
{"add":{"path":"part-00001-516ff7ee-14af-4386-9516-d083ea917f3e-c000.snappy.parquet","partitionValues":{},"size":478,"modificationTime":1662709326645,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":0},\"maxValues\":{\"id\":0},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part-00003-6f99edaf-31eb-4d14-99d7-8adac6bf8da8-c000.snappy.parquet","partitionValues":{},"size":478,"modificationTime":1662709326645,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":1},\"maxValues\":{\"id\":1},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part-00005-09b48bc3-f629-4003-b663-a906c2d36cf5-c000.snappy.parquet","partitionValues":{},"size":478,"modificationTime":1662709326645,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2},\"maxValues\":{\"id\":2},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part-00007-f386b922-92de-4c02-9674-6094cf61a799-c000.snappy.parquet","partitionValues":{},"size":478,"modificationTime":1662709326645,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":3},\"maxValues\":{\"id\":3},\"nullCount\":{\"id\":0}}"}}
{"add":{"path":"part-00009-fe41f0c5-3bac-43c8-8bcd-82ac9d73cea1-c000.snappy.parquet","partitionValues":{},"size":478,"modificationTime":1662709326645,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4},\"maxValues\":{\"id\":4},\"nullCount\":{\"id\":0}}"}}
{"commitInfo":{"timestamp":1662709326676,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"6","numOutputRows":"5","numOutputBytes":"2686"},"engineInfo":"Apache-Spark/3.2.1 Delta-Lake/1.2.0","txnId":"02a8ed70-271b-4ad9-b7fa-69a907d8ae66"}}

It contains 6 AddFile, we should follow the order in the tnx content. The next tnx log also followed it.

The order example:
json0_AddFile0 -> json0_AddFile1 -> json0_AddFile2 -> json0_AddFile3 -> json0_AddFile4 -> json0_AddFile5 -> json1_AddFile0 -> json1_AddFile1 -> json1_AddFile2 -> json1_AddFile3 -> json1_AddFile4 -> json1_AddFile5 ...

We don't have the original file name of an action when reading a checkpoint. So we cannot sort by the json files. If you propose to add the original file name of an action into the checkpoint files, what should we do with the existing tables not having such information?

We make sure the checkpoint's action is ordered. Every time to generate checkpoint, it will append current snapshot.activeFiles, If the activeFiles always order, the content of checkpoint is always ordered.
But there is a not sure point, the snapshot differ action as setTransactions, tombstones, activeFiles, maybe we can't ensure the order between setTransactions, tombstones, activeFiles.

Hi @horizonzy - we would need all clients to do this. Else, when you read a checkpoint, you wouldn't know which client wrote it and if it is sorted. Enforcing all clients to do this would be a protocol change.

We are allowed to propose new protocol changes, but I think you'd have to really argue your case. What benefit do we get from this?

I want to filter particular AddFile to read data. I will apeend tag to AddFile, likes (start:0, end:100), that means this AddFile contains the data which is from 0 to 100. If I want to fetch data 50, The AddFile(start:0, end: 100) contains the data I want, then I read the data from AddFile.getPath().

After I read the source code, I find out that it is not work. Although the AddFile is ordered, it still need check the AddFile tag one by one. If there are lots of AddFile (million level), it is inefficient.

I think it should add some index info to search target AddFile faster.

@horizonzy can you partition your data? We provide snapshot.scan(Expression) APIs to let you partition prune.

The snapshot.scan(Expression) also check AddFile one by one, it check if AddFile partitionValues is match the expression, it's also inefficient.

We are discussing the new topic about AddFile scanning in https://delta-users.slack.com/archives/CJ70UCSHM/p1663156471536779

@horizonzy what if we added an API/config that sorted the data on read? also, what would be sort it by?