icelake-io/icelake

Low level writer design

ZENOTME opened this issue · 7 comments

Generally, the iceberg is connected to query enginee, such as Risingwave, Flink. And they will have different requirement to have their own writer. E.g. in Flink have the delta writer. So I'm thinking two question:

  1. Should we implement the delta writer in icelake?
  2. Or alternatively we should provide some low level writer interface to let user custom their own writer.

In iceberg-java, it provide the custom capability by using abstract class and inherit. But rust can't support them, so I propose a way to solve it.
The principle in this design is:

  1. Expose the struct, let the user can compose them to write their own writer.
  2. Having some check to avoid user having wrong custom behaviour.
impl Table {
    /// Return the ParititonSplitter to help user split the record batch 
    /// `None` => if the table is unpartition, so that user should write in unpartition way.
    fn partition_splitter() -> Option<PartitionSplitter>

    /// Return a builder to build a FileWriter. 
    fn writer_builcer() ->  FileWriterBuilder 
}

/// FileWriter only write one partition in rolling way, it's the low level writer provide to user, user should process parititon by themselves.
trait FileWriter {
    fn write(&mut self, &record_batch);
    
    // When close, user should pass partition_value, if table has partition, it should be Some(_), otherwise it will crash. And vice verse for unpartition. We gurantee that user will close correctly here. 
    fn close(self, partition_value: Option<StructValue>) -> Vec<DataFile>
}

/// PartitionSplitter has implemented, basiclly it has two main function
impl ParititonSplitter {
    /// Used in process write 
    fn split(record_batch) -> HashMap<PartitionKey,RecordBatch>
    
    /// Used after write done, we can convert PartitionKey into StructValue to pass it to writer.close()
    fn convert(PartitionKey) -> StructValue
}

As a user example, we can see our implementation of PartitionWriter, it basiclly used the interface above.

I'm not sure whether it's good way, feel free to any comment. @liurenjie1024 @Xuanwo

So IIUC, you plan to let query engine to keep the hash map of partition value to FileWriter? This ok to me since query engine has more knowledge of partition, and can do better control.

I'm not quite sure how you plan to do with Delta writer? Essentially each DeltaWriter consists of three parts: data writer , EQ delete writer, pos delete writer. Are they wrapped in one DeltaWriter or user needs to create three writers independently?

Personally I prefer to keep DeltaWriter in icelake since the logic could be shared cross engine. Each DeltaWriter only cares about one partition data.

So IIUC, you plan to let query engine to keep the hash map of partition value to FileWriter?

Yes.

I'm not quite sure how you plan to do with Delta writer? Essentially each DeltaWriter consists of three parts: data writer , EQ delete writer, pos delete writer. Are they wrapped in one DeltaWriter or user needs to create three writers independently?

Personally I prefer to keep DeltaWriter in icelake since the logic could be shared cross engine. Each DeltaWriter only cares about one partition data.

Keep it in icelake also looks good to me. There is one question:

  • Should we keep the original write semantic in java BaseDeltaWriter.
    DeltaWriter will delete the row with same key if it's in the cache in DeltaWriter before writing. It can't be used as upsert because the row with same key may not in the cache. So UpsertWriter should delete explicitly before writing.

Sorry I don't get hour point, could give a more detailed example why it can't do upsert? CC @ZENOTME

Sorry I don't get hour point, could give a more detailed example why it can't do upsert? CC @ZENOTME

I think I means that it can't do upsert directly. The user should also delete explicitly externally like:
https://github.com/apache/iceberg/blob/2e1ec5fde9e6fecfbc0883465a585a1dacb58c05/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java#L83-L85

        if (upsert) {
          writer.deleteKey(keyProjection.wrap(row)); <<< delete explicitly here 
        }
        writer.write(row);

Because delta writer only delete when row in the internally cache. https://github.com/apache/iceberg/blob/2e1ec5fde9e6fecfbc0883465a585a1dacb58c05/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L139-L144

Ok, I see. I think we need to keep the original op column in stream chunk. We don't have to use enums, integer would be enough.

Solve in #243.