ReadParquetFSSpec will throw error when collecting statistics if plan is emply
benrutter opened this issue · 9 comments
Describe the issue:
I'm having a little trouble actually recreating this error, and initially thought it was related to azure functions: dask/dask#11037.
I understand pretty well now the type of situations it'll happen in, so I'll just explain the cause of the bug.
Essentially, when the self._plan variable fo ReadParquetFSSpec is empty, it'll set it's internal _io_func
property to be the identity function rather than the ParquetFunctionWrapper:
@property
def _io_func(self):
if self._plan["empty"]:
return identity
dataset_info = self._dataset_info
return ParquetFunctionWrapper(
self.engine,
dataset_info["fs"],
dataset_info["base_meta"],
self.columns,
dataset_info["index"],
dataset_info["kwargs"]["dtype_backend"],
{}, # All kwargs should now be in `common_kwargs`
self._plan["common_kwargs"],
)
This causes issues later when attempting to collect parquet statistics:
def _collect_pq_statistics(
expr: ReadParquet, columns: list | None = None
) -> list[dict] | None:
"""Collect Parquet statistic for dataset paths"""
# Be strict about columns argument
if columns:
if not isinstance(columns, list):
raise ValueError(f"Expected columns to be a list, got {type(columns)}.")
allowed = {expr._meta.index.name} | set(expr.columns)
if not set(columns).issubset(allowed):
raise ValueError(f"columns={columns} must be a subset of {allowed}")
# Collect statistics using layer information
fs = expr._io_func.fs
The line:
fs = expr._io_func.fs
Will throw an error if _io_func is actually the identity function because, unlike the ParquetFunctionWrapper class it doesn't have an "fs" object.
That leads to an error with this kind of a stack:
File \"/home/site/wwwroot/lib_functions.py\", line 423, in clipped_dataset
return_df_size = len(return_df)
File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_collection.py\", line 381, in __len__
return new_collection(Len(self)).compute()
File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_collection.py\", line 452, in compute
out = out.optimize(fuse=fuse)
File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_collection.py\", line 488, in optimize
return new_collection(self.expr.optimize(fuse=fuse))
File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_expr.py\", line 93, in optimize
return optimize(self, **kwargs)
File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_expr.py\", line 2877, in optimize
return optimize_until(expr, stage)
File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_expr.py\", line 2828, in optimize_until
expr = result.simplify()
File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_core.py\", line 371, in simplify
new = expr.simplify_once(dependents=dependents, simplified={})
File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/_core.py\", line 332, in simplify_once
out = child._simplify_up(expr, dependents)
File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/io/parquet.py\", line 622, in _simplify_up
_lengths = self._get_lengths()
File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/io/parquet.py\", line 1311, in _get_lengths
self._update_length_statistics()
File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/io/parquet.py\", line 1333, in _update_length_statistics
stat[\"num-rows\"] for stat in _collect_pq_statistics(self)
File \"/home/site/wwwroot/.python_packages/lib/site-packages/dask_expr/io/parquet.py\", line 1554, in _collect_pq_statistics
fs = expr._io_func.fs
AttributeError: 'function' object has no attribute 'fs'
Environment:
- Dask version: 2023.4.0
- Python version: 3.10
- Operating System: Linux
- Install method (conda, pip, source): pip
I can't reproduce what you are running into unfortunately. I am not sure how we can get there, we would require an empty directory but this is caught beforehand. Filters can't be specified, we are protected against this case. I think we need a bit more information to understand how we end uo there
@phofl - that's super interesting, I'm not that surprised that it's hard to reproduce, as it's been a bit of a hard to track down bug for me and so far I've only be able to recreate very specifically in an azure function, which is a really unusual environment for dask anyway.
It seems like a classic type mismatch error where something odd is probably happening at a different point but it just comes up there. I'll try and dig around to figure out how it's getting thrown - the code that threw the error trace I included was on dask 2024.3 if that makes a difference.
I think it might be caused by asking for the length of an empty dataframe from read_parquet actions, I'll do what I can to put together a minimal reproducible example.
Yup, read parquet invocation looks like this with no special kwargs other than storage_options. I've taken out the real paths, and some extra code that interprets the list which in the case throwing the error (although weirdly again, I'm only seeing the error in an azure function environment and not elsewhere*) winds up with a list of just 1.
df = dd.read_parquet(["abfs://somecontainer/folder/*.parquet"], storage_options=storage_options)
this_bit_will_crash = len(df)
(worth noting that in the crashing instance I found, the folder was in fact empty which may or may not be relevant)
Can you share how big the parquet file is or don't you exactly know how big it is?
I don't think there's any parquet file at all, just an empty folder in the case throwing the error. It's a step of a pipeline that's checking whether some data exists and then running some tests on it if it does. In this case, it's just an empty dataframe I think, which might be what's causing the problem (although that might be a red herring)
(I say empty dataframe, because that's the usual return of dd.read_parquet("some/not/matching/globstring/*.parquet") rather than it being a parquet with 0 data or something like that)
I also don't know if the empty data itself is causing the issue (I've found the bug in resources running at my work, so I'm slightly restricted on the tests I can run on it - sorrry, I know that's a bit annoying!)
Yeah found a reproducer, your folder is empty. Normally this would raise earlier (that's why I couldn't reproduce in another way), but globs in lists don't do that (for some reason)
Ah that's amazing - nice one!!
I'm asking this just for my own learning, but how come the _collect_pq_statistics ran when it normally wouldn't? Is this because normally trying to read an empty folder with something like dd.read_parquet("empty/folder")
throws an error (apart from when passed a list) so _collect_pq_statistics isn't expecting that as a possibility?
Yeah normally this raises way before you even get close to _collect_pq_statistics