dask/dask-expr

Computing `index` of a parquet dataset can cause `ZeroDivisionError`

fjetter opened this issue · 0 comments

import dask
import dask.dataframe as dd
import pandas as pd
pdf = pd.DataFrame({"foo": range(5)}, index=range(50, 55))
pdf.to_parquet("test.parquet")
dd.read_parquet("test.parquet").index.compute()
ZeroDivisionError                         Traceback (most recent call last)
Cell In[1], line 6
      4 pdf = pd.DataFrame({"foo": range(5)}, index=range(50, 55))
      5 pdf.to_parquet("test.parquet")
----> 6 dd.read_parquet("test.parquet").index.compute()

File [~/workspace/dask-expr/dask_expr/_collection.py:452](http://localhost:8888/~/workspace/dask-expr/dask_expr/_collection.py#line=451), in FrameBase.compute(self, fuse, **kwargs)
    450 if not isinstance(out, Scalar):
    451     out = out.repartition(npartitions=1)
--> 452 out = out.optimize(fuse=fuse)
    453 return DaskMethodsMixin.compute(out, **kwargs)

File [~/workspace/dask-expr/dask_expr/_collection.py:488](http://localhost:8888/~/workspace/dask-expr/dask_expr/_collection.py#line=487), in FrameBase.optimize(self, fuse)
    487 def optimize(self, fuse: bool = True):
--> 488     return new_collection(self.expr.optimize(fuse=fuse))

File [~/workspace/dask-expr/dask_expr/_expr.py:93](http://localhost:8888/~/workspace/dask-expr/dask_expr/_expr.py#line=92), in Expr.optimize(self, **kwargs)
     92 def optimize(self, **kwargs):
---> 93     return optimize(self, **kwargs)

File [~/workspace/dask-expr/dask_expr/_expr.py:2877](http://localhost:8888/~/workspace/dask-expr/dask_expr/_expr.py#line=2876), in optimize(expr, fuse)
   2856 """High level query optimization
   2857 
   2858 This leverages three optimization passes:
   (...)
   2873 optimize_blockwise_fusion
   2874 """
   2875 stage: core.OptimizerStage = "fused" if fuse else "simplified-physical"
-> 2877 return optimize_until(expr, stage)

File [~/workspace/dask-expr/dask_expr/_expr.py:2833](http://localhost:8888/~/workspace/dask-expr/dask_expr/_expr.py#line=2832), in optimize_until(expr, stage)
   2830     return expr
   2832 # Manipulate Expression to make it more efficient
-> 2833 expr = expr.rewrite(kind="tune")
   2834 if stage == "tuned-logical":
   2835     return expr

File [~/workspace/dask-expr/dask_expr/_core.py:257](http://localhost:8888/~/workspace/dask-expr/dask_expr/_core.py#line=256), in Expr.rewrite(self, kind)
    255 # Allow children to rewrite their parents
    256 for child in expr.dependencies():
--> 257     out = getattr(child, up_name)(expr)
    258     if out is None:
    259         out = expr

File [~/workspace/dask-expr/dask_expr/io/io.py:93](http://localhost:8888/~/workspace/dask-expr/dask_expr/io/io.py#line=92), in BlockwiseIO._tune_up(self, parent)
     92 def _tune_up(self, parent):
---> 93     if self._fusion_compression_factor >= 1:
     94         return
     95     if isinstance(parent, FusedIO):

File [~/workspace/dask-expr/dask_expr/io/parquet.py:673](http://localhost:8888/~/workspace/dask-expr/dask_expr/io/parquet.py#line=672), in ReadParquet._fusion_compression_factor(self)
    670     return 1
    671 nr_original_columns = len(self._dataset_info["schema"].names) - 1
    672 return max(
--> 673     len(_convert_to_list(self.operand("columns"))) [/](http://localhost:8888/) nr_original_columns, 0.001
    674 )

ZeroDivisionError: division by zero

Originally, a different issue was reported by a user, see dask/dask#10995 (comment)