datafusion-contrib/arrow-zarr

Implement Predicate Pushdown

Closed this issue · 17 comments

Push down the appropriate predicate filters down to the physical file reading.

@tshauck I'm gonna get started on this soon, probably tomorrow. What else do we have that we're working on right now? as I mentioned on the async stream issue, I'll merge something soon, that we won't use for now, and then we have the partitioning, which I think we can implement in a classic way, directory based (there's the idea I mentioned before, that depends on the filtering implementation, we can discuss later). anything else that should be implemented short term?

I think those are the big things. It'd probably be easier to stack the filter and partition work, since we'd want the filtering to push down any applicable features the file listing.

One minor thing we could do is to optimize the reader. Right now, I'm using the async reader even for local files, but it might be better match on https://docs.rs/object_store/latest/object_store/enum.GetResultPayload.html and use regular reader for local files. Edit: made a ticket for this, but it's minor.

Hmm yeah, as it stands I think the async reader and regular reader (which doesn't use the object_store at all I believe) run at about the same speed, I don't remember if the regular reader was a smidge faster, but yeah it might get faster if we use the object_store for the local file case, I can look into it.

First off, tomorrow I will merge my work on the async stream, nothing we will use for now, just to save the work and revisit later, and I will look into filtering, just to ballpark how fast I can implement it, and decide what to work on next.

Alright so I'm looking into this now, but before going into specifics for the filter pushdown, I think we need to decide on something. I looked into datafusion and I remembered about something I saw when I first looked into it. The TableProvider trait is not implement for each file reader (parquet, csv, etc...), but it's implemented for the ListingTable struct, which if I understand correctly can work off of different file formats. Should we consider extending the ListingTable struct to work for the zarr readers, like they do for parquet files for example, or should we implement a brand new struct just for zarr files? Based on the work you've done so far, you're going for the latter, which I'm fine with, but I'm curious, have you considered the former?

At this point I don't yet have a good enough understanding of datafusion to make a clear call, but do you think re-using ListingTable could save us some work? I guess for now, I mostly want to know if it's worth it for me to take a closer look and consider that option, or if you already have a good reason why we shouldn't do that, and I'd be wasting my time looking into it?

Things could've changed since I originally implemented that pattern, but I think the issue I had is at face value FileFormat looks like a good option, but it's somewhat tied to FileType which is an enum of those main file formats and not extensible to other types (i.e.).

In this repo, ZarrPath seems to handle v2 and v3 (or does it not?), so what other file formats do you have in mind that would make an intermediate file format trait useful?

aah, I see, so you're saying we can't do what I suggested without actually modifying the datafusion code? that settles it then, let's keep doing what you started, a new struct it is!

Yeah it handles v2 and v3. I didn't have any other file formats in mind, I was just thinking that we could re-use as much stuff as possible from the existing work on parquet files. But now that you mention it, there are multiple raster formats out there, maybe one day supporting some of them could be useful. That would potentially make things a bit more convoluted though, like some sort of generic raster interface that we implement for just zarr (for now)... Probably we should just stick to zarr for now? Were there other formats that you'd want this to work with?

For the push down, I'll get started today, even though we're not re-using TableListing, I think I can implement the push down logic the same way it's implemented for parquet files, I can at least get some inspiration from that, since in the end the zarr format allows to select "columns" individually like parquet, and the way I implemented filtering was heavily influenced by how they did it for parquet files (because I thought it would make things simpler down the road).

Yeah, there's occasionally been stabs at supporting an extension file type, or the like but they've flamed out.

Otherwise, I think sticking to zarr for this repo would be good. If we want to exand into other formats later we certainly could, but it'd be nice to make this solid at zarr support (which you've gotten most the way there, we just need to add datafusion niceties).

I'd have to look at how it works in parquet, but at least here you'll update supports_filters_pushdown return the proper value for the filters depending on their level of specificity, then update scan so the datafusion matriculate down to the physical layer and eventually become zarr predicates (I think that's the construct you're intending).

fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
// TODO: which filters can we push down?
Ok(filters
.iter()
.map(|_| TableProviderFilterPushDown::Unsupported)
.collect())
}
async fn scan(
&self,
_state: &SessionState,
projection: Option<&Vec<usize>>,
_filters: &[Expr],
limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {

Yep that's the plan. So looking more into this, we'll mostly need a few functions from datafusion, which seem to be public so it should be fine. Basically we need one function that will evaluate an Expr and return a bool, expr_applicable_for_cols, and one function that will take an Expr and return a PhysicalExpr which can be passed to the reader as a filter (more or less), create_physical_expr.

For the former, for parquet files some types of filters are assigned a TableProviderFilterPushDown::Inexact, I think I will probably skip that, it will either be Exact or Unsupported. For the actual filter pushdown, like I said I mostly followed what they did for parquet files for the reader, so I'm optimistic I should be able to follow how filer pushdowns worked there.

I'm having some issues with the use statements right now, for some reason I can't seem to import the above functions even though I'm using the correct module structure and they are public, I'm not sure why but I'll figure it out soon, and then it's hopefully relatively straightforward. I might take a few hours to go over how it works for parquet, in slightly more details than I need for the zarr implementation, just for my understanding, but I'll make sure I get his done soon since we need it to move forward.

Dope, that all sounds good to me. FWIW, I took a quick look at expr_applicable_for_cols. It is public (pub fn) like you say, but doesn't make it out of that module from what I can tell.

E.g. even w/i the datafusion repo, I can't use it outside of of datasource::listing.

image

Quick update, this is taking much longer than I thought it would, sorry about that, but I've made some progress, I'm hoping to have a wip PR by Sunday, with some thoughts on next steps and a minimal working filter setup.

It turns out the parquet implementation of filter pushdown is quite convoluted (I'm sure there's good reasons for that), and we can't use it directly for zarr, so I'm pulling bits and pieces of it, only the stuff we need, and building the logic in this repo, staying as close as possible to what they did for parquet in datafusion but also simplifying things a lot for now. Stay tuned!

Okay so I've learned a lot about how it works for parquet files over the last several days, I see what you meant about the standard way filters work being based on indices, not column names. The thing is, and I say that with limited knowledge of how datafusion works of course, I find using indices to be quite tedious and way more complicated than using column names, and I'd want the filter pushdown logic (which would be completely internal to the Zarr implementation) to work off of column names. However, I thought I'd check with you first, since maybe someone in the datafusion community brought that up before and it was pointed out that it's a bad idea for some reason. Any concerns with a column based filter pushdown logic?

For context, if you use indices, then there are the indices in the file/table schema, the indices in the record batch for the filter, and the indices in the predicate. You need to keep track of all of those, and convert on the fly as necessary, so when you're applying a predicate, it can't be purely "self contained", you need outside information for all that mapping.

A concrete example of where it might not be the best approach (and where I'd need to significantly refactor how I did filters) is if you want to check 3 conditions, lat > a, lon < b, lat + lon > c, for example. The way it works for parquet, if I understood correctly, is that it would read lat, apply the predicate, read lon (taking into account the previous predicate), apply the predicate, then read lat and lon (again, but taking into account previous predicates) and apply the predicate, and produce one row selection. To avoid reading the same data more than once, because for Zarr you can't cherry-pick rows, it's compressed data, so reading is expensive, I set things up so that instead you read lat and lon once (by taking the union of all the columns your predicates need), and then use it across predicates as needed.

But, that doesn't work with indices, because for example the filter for lon < b doesn't know that lat will also be in the record batch. Contrast that with the filter knowing it needs the data from column "lon", and it doesn't matter if the record batch contains multiple columns that the predicate doesn't know about, you can just retrieve the lon column (into a new record batch) explicitly. You don't need to map and project between different "index spaces" either, the logic is simpler.

All that to say that it looks like a much nicer approach to me, but again, there may be things I'm not aware off, so just wanted to see what you think before I make that change and create a PR.

I don't know if I've seen something specific w.r.t. why datafusion uses it, but selecting by position (albeit 1-based) is part of the ANSI SQL Standard (SELECT 1, 2 FROM my_table) and even with it being the extra info to your point, vecs of ints are obviously smaller and faster to compare than vecs of strings.

That said, I'm not opposed to having the filtering internal to zarr column-name based if it affords a cleaner implementation, it should probably just be simple to init from a projection so the interface between datafusion and it is easy.

Hmm, I see. I mean for a Zarr store, the ordering is pretty arbitrary (we decided on alphabetical order to have some convention, but it's arbitrary) so I wonder it that's even a use case here (selecting by position). Even if it isn't though, the standard should probably be supported. I think it should be doable though even with what I have in mind.

I implemented what I was thinking about and it seems to work, I'll have something to show you very soon. Btw, I'm trying to do something very simple, for testing, but for some reason it seems to be very convoluted. I just want to extract a vector from a record batch column (by explicitly specifying a type, obviously the operation would fail if it's an invalid conversion), do you know how to do that?

Does https://docs.rs/arrow/latest/arrow/record_batch/struct.RecordBatch.html#method.column_by_name work? -- then you can downcast the array into the type you need. There's also some built-ins that could help, e.g. https://arrow.apache.org/rust/arrow/array/fn.as_string_array.html.

Well, so I meant to a vector with a primitive rust type, e.g. Vec< f64 >. A Float64Array can't be downcast_ref to a Vec< f64 > right?

I don't think that can be downcast, but once you have the Float64Array I think the values method on it or iterating plus collecting can get you to a vec of f64s.

Right, but then I run into something like value of type Vec<i32> cannot be built from std::iter::Iterator<Item=&PrimitiveArray<arrow_array::types::Int32Type>> (when trying the conversion with Int32Array). Anyway, I'm sure I'll find something, it's just frustrating that something so simple is so convoluted, it's much simpler to use vectors to test that the predicate conditions were applied correctly (I checked visually, but obviously I want to write tests that actually check for conditions).