delta-io/connectors

Use `delta-standalone` to append data only but get concurrency conflicts

HuaHuaY opened this issue · 3 comments

Runtime

macOS 13.1

Pre problems

I have found that

  • if I use 3.1.0 version of hadoop-aws and hadoop-client, delta-standalone just generates crc file when a checkpoint happens. It will not generate crc file when just committing a json file.
  • if I use 3.3.2 version of hadoop-aws and hadoop-client, delta-standalone will generates crc file for every files I committed.

Problems

I find that when I use multiple threads to add data to a DeltaTable, there are concurrency conflicts. I have updated a demo project which shows how I use delta-standalone and updated the output when I use two versions of dependencies. I create 10 threads to insert data. I use CREATE TABLE test_alone (id int) USING DELTA LOCATION 'file:///Users/huahua/test_delta_lake_standalone/delta'; to create DeltaTable in spark-sql.

  • if I use 3.1.0 version of hadoop-aws and hadoop-client, one thread may overwrite another thread's json log file. We can find three Committed delta #4 and two Committed delta #6 in the output. And finally, there are only 7 json log files. So I lost data of three .parquet.
  • if I use 3.3.2 version of hadoop-aws and hadoop-client, program will throw org.apache.hadoop.fs.ChecksumException like this:
[5] INFO io.delta.standalone.internal.DeltaLogImpl - Loading version 8.
[5] INFO io.delta.standalone.internal.SnapshotImpl - [tableId=8104454d-c88c-42bf-9762-e2cfdcd5db7c] Created snapshot io.delta.standalone.internal.SnapshotImpl@3e04049d
[5] INFO org.apache.hadoop.fs.FSInputChecker - Found checksum error: b[0, 434]=7b22636f6d6d6974496e666f223a7b2274696d657374616d70223a313637313436363036373739362c226f7065726174696f6e223a225752495445222c226f7065726174696f6e506172616d6574657273223a7b226d6f6465223a22417070656e64222c22706172746974696f6e4279223a225b5d227d2c227265616456657273696f6e223a302c2269736f6c6174696f6e4c6576656c223a2253657269616c697a61626c65222c226973426c696e64417070656e64223a747275652c226f7065726174696f6e4d657472696373223a7b226e756d4f7574707574526f7773223a22313030222c226e756d4f75747075744279746573223a22343734222c226e756d46696c6573223a2231227d2c22656e67696e65496e666f223a224176726f2f312e31312e312044656c74612d5374616e64616c6f6e652f302e362e30227d7d0a7b22616464223a7b2270617468223a22312e70617271756574222c22706172746974696f6e56616c756573223a7b7d2c2273697a65223a3437342c226d6f64696669636174696f6e54696d65223a313637313436363036373738332c22646174614368616e6765223a747275657d7d0a
org.apache.hadoop.fs.ChecksumException: Checksum error: file:/Users/huahua/test_delta_lake_standalone/delta/_delta_log/00000000000000000008.json at 0 exp: -194659397 got: -1104075335
	at org.apache.hadoop.fs.FSInputChecker.verifySums(FSInputChecker.java:347)
	at org.apache.hadoop.fs.FSInputChecker.readChecksumChunk(FSInputChecker.java:303)
	at org.apache.hadoop.fs.FSInputChecker.read1(FSInputChecker.java:252)
	at org.apache.hadoop.fs.FSInputChecker.read(FSInputChecker.java:197)
	at java.base/java.io.DataInputStream.read(DataInputStream.java:149)
	at java.base/sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
	at java.base/sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
	at java.base/sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
	at java.base/java.io.InputStreamReader.read(InputStreamReader.java:181)

I have checked that there are no conflicts when using this code to write in minio and s3. I don't know whether this issue is related to file system.

Just one source code file.

package org.example;

import io.delta.standalone.DeltaLog;
import io.delta.standalone.Operation;
import io.delta.standalone.OptimisticTransaction;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.exceptions.DeltaConcurrentModificationException;
import io.delta.standalone.types.StructType;
import io.delta.standalone.util.ParquetSchemaConverter;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.avro.AvroSchemaConverter;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class MainMultiThreads {
    public static void main(String[] args) throws Exception {
        int init_commit = 0;
        int commit_end_no = init_commit + 10;

        ArrayList<RunnableDemo> array = new ArrayList<>();
        for (int i = init_commit; i < commit_end_no; i++) {
            array.add(new RunnableDemo(i));
        }
        for (int i = 0; i < commit_end_no - init_commit; i++) {
            array.get(i).start();
        }
    }
}

class RunnableDemo implements Runnable {
    private Thread t;
    private final int threadIndex;

    RunnableDemo(int threadIndex) {
        this.threadIndex = threadIndex;
    }

    public void run() {
        String location = "file://" + Paths.get("delta").toAbsolutePath().toString();

        Configuration conf = new Configuration();
        DeltaLog log = DeltaLog.forTable(conf, location);

        StructType schema = log.snapshot().getMetadata().getSchema();
        MessageType parquetSchema = ParquetSchemaConverter.deltaToParquet(schema);
        Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema);

        Path path = new Path(log.getPath(), String.format("%d.parquet", this.threadIndex + 1));
        try {
            HadoopOutputFile outputFile = HadoopOutputFile.fromPath(path, conf);
            ParquetWriter<GenericRecord> parquetWriter = AvroParquetWriter
                    .<GenericRecord>builder(outputFile)
                    .withSchema(avroSchema)
                    .withConf(conf)
                    .withCompressionCodec(CompressionCodecName.SNAPPY)
                    .build();
            for(int i = 0; i < 10; i++) {
                GenericRecord record = new GenericData.Record(avroSchema);
                record.put("id", this.threadIndex * 100 + i);
                parquetWriter.write(record);
            }
            parquetWriter.close();

            AddFile addFile = new AddFile(
                    path.toString(),
                    new HashMap<>(),
                    path.getFileSystem(conf).getContentSummary(path).getLength(),
                    System.currentTimeMillis(),
                    true,
                    null,
                    null
            );


            Map<String, String> operation_paras = new HashMap<>();
            operation_paras.put("mode", "\"Append\"");
            operation_paras.put("partitionBy", "\"[]\"");
            Map<String, String> metrics = new HashMap<>();
            metrics.put("numFiles", "1");
            metrics.put("numOutputRows", "100");
            metrics.put("numOutputBytes", String.format("%d", addFile.getSize()));
            Operation operation = new Operation(Operation.Name.WRITE, operation_paras, metrics);
            OptimisticTransaction txn = log.startTransaction();

            txn.commit(List.of(addFile), operation, "Avro/1.11.1");
        } catch (IOException ioException) {
            System.out.println(ioException.getMessage());
        } catch (DeltaConcurrentModificationException e) {
            // handle exception here
            System.out.println(e.getMessage());
        }
    }

    public Thread start() {
        if (t == null) {
            t = new Thread(this, String.format("%d", this.threadIndex + 1));
            t.start();
        }
        return t;
    }
}

Code and outputs

my_demo_code.zip
output_version_3.1.0.txt
output_version_3.3.2.txt

Thanks for making this issue! Will take a look and investigate.

@HuaHuaY Do you mean the issue only happens when you write to the local file system? If so, this is a known issue. The local file system doesn't meet the following requirement we document in https://github.com/delta-io/delta#requirements-for-underlying-storage-systems

Mutual exclusion: Only one writer must be able to create (or rename) a file at the final destination.

@zsxwing This issue only happens in my local file system. Thank you for your reply and you can close this issue.