Implement simple delete case
wjones127 opened this issue · 8 comments
Description
It's not uncommon to delete along partition boundaries. In those cases the delete should be very simple to implement; just commit some remove actions.
I think we should probably design the API so we can eventually support any where clause in deletes, rather than have a separate function for this simple case and the case where we have to rewrite files.
See the write
operation implementation for inspiration: https://github.com/delta-io/delta-rs/blob/main/rust/src/operations/write.rs
Use Case
Related Issue(s)
I would like this!
Hi,
I could probably contribute to this under some kind of supervision as I am new to rust.
If i understand this correctly, the dummy approach to implement this would be to:
- iterate over parquet files from current version
- query each file to check if it contains rows that matches predicate, something like:
let df = ctx.read_parquet(...
let filtered = df.select_columns(columns_from_predicate).filter(predicate).limit(0, Some(1))?;
let count = filtered.count().await?;
Then for each file with matched rows (count > 0), rewrite it with (with filters applied), add proper delete actions and commit.
I guess in the next iteration we would need to look into metadata to check min/max values to reduce the number of files we need to scan.
Does this make sense?
Surely, as a first step we would need to check if predicate columns matches table partitioning.
Hey @dudzicp,
Thanks for offering to help, we are more then happy to support along the way!
Your initial thought is going in the right direction I think. One of the beautiful things about delta though is, that we have a lot of valuable information available in the delta log already that we can leverage to exclude some files a-priori and avoid the expensive IO that comes with it. The good news is, the integration with datafusion can already leverage this.
Without having thought about it too much yet, the way I would get started is to leverage the pruning predicate that is implemented for DeltaTable
. We do something similar when we do file skipping in the scan operation.
delta-rs/rust/src/delta_datafusion.rs
Lines 367 to 381 in 8a82f79
So one way to go could be to pre-select the files from the log that might have matches using PruningPredicate
, which gives a all candidates. From there on the brute force way would be to read all files with the filter applied and write out the data immediately, and just assuming each file has a hit.
As you already mentioned in this case we might already be doing too much work, however we could leverage a lot of the existing implementation. I.e. we get all the parallelism etc that comes out of the box with datafusion almost for free and don't need to implement that here.
An alternative way, that might already get us a bit along the way to also supporting deletion vectors is to create a query plan (LogicalPlan
or directly ExecutionPlan
from datafusion), that will generate single column record batches containing the row indices for all rows that we want to delete. We would have to somehow keep track which vector belongs to which file. One possible solution would be to partition the plan for each file and track partition indices to file names. These could then be used to create a mask for filtering the data when we want to write new files, or (at a later stage) create the deletion vectors from these arrarys. But this approach probably goes beyond "simple delete case" :).
I guess to get us started and establish the higher level APIs, my suggestion would be to use the pruning predicate to get all relevant files, use these to create a ListingTable
that can be sued as a data source for a new execution plan (potentially leveraging higher level DF apis to create the plan) and pass that plan to the writer implementation from the operations module. The writer will give you all the add actions, and we then only need to create the delete actions.
Another angle on this might be to start from the same list of files as above, read all the data we want to keep and for each file-partition check if we have as many rows as indicated in the meta-data, if so, there were no hits on the delete predicate and can (have to) disregard the data. if we have less rows, we saw have a hit, and should write the data.
This is so far just thinking out loud, and I have to think a bit more about the best approach I guess for the first iteration we should aim for scanning each relevant file at most once, and leverage the scheduling / execution capabilities from datafusion as best we can.
Then again I may also have missed something obvious and @houqp, @wjones127, or @schuermannator have some better ideas how to proceed 😆.
I may also be wrong about scanning files only once. Specifically fi we write the data, we need all columns etc, deciding if we need it may be done on just a small subset of columns and thus potentially require much less IO ...
Then for each file with matched rows (count > 0), rewrite it with (with filters applied), add proper delete actions and commit.
I guess in the next iteration we would need to look into metadata to check min/max values to reduce the number of files we need to scan.
Yeah I'd almost be tempted to start there. I imagine the flow is something like (sort of Rust, sort of pseudo code):
enum FilterAction {
Ignore, // No matching rows in the file
Remove, // The entire file matches the predicate
Rewrite // At least part of the file matches the predicate
}
/// For an add action, decide whether the predicate applies to the full file,
/// none of the file, or part of the file.
fn check_filter(action: AddAction, predicate: Predicate) -> FilterAction {
todo!()
}
fn filter_operation(table: DeltaTable, predicate: Predicate) -> {
let new_actions: Vec<Action> = vec![];
let files_to_rewrite: Vec<&AddAction> = vec![];
for action in table.add_actions() {
match check_filter(action, predicate) {
Ignore => {},
Remove => {
new_actions.push(make_remove_action(action));
},
Rewrite => {
new_actions.push(make_remove_action(action));
files_to_rewrite.push(action);
}
}
}
// Read and filter parquet files we need to rewrite
let plan = ctx.read_parquet(files_to_rewrite).filter(!predicate);
let files_written = write(plan);
new_actions.extend(create_add_actions(files_written));
commit_transaction(new_actions);
}
(This is basically what Robert described.)
check_filter
would use PruningPredicate like Robert is suggesting. I guess you might have to evaluate with predicate
and !predicate
to figure out to differentiate whether an entire file can be removed or we actually have to filter rows. Though I'm not 100% sure that's sound.
From there on the brute force way would be to read all files with the filter applied and write out the data immediately, and just assuming each file has a hit.
I think once you know you need to rewrite a file, there isn't a much more efficient way to do the filtering. (Except maybe you could you Parquet row group statistics to determine which row groups can be passed through to the new file as is without any decoding/decompression. But that would require getting deep into the Parquet implementation, so we'll keep that out of scope.) The nice thing is a scan then filter has no pipeline breakers, so it can be done in a totally streaming fashion (don't have to load the entire table into memory at once).
Hi @dudzicp are you still interesting on taking this work on?
I currently have a branch with a functional implementation just requires further refinement.
Yes, I will get back to this topic probably in a month as I am swamped at my primary job. Feel free to push this further if you have time.