pangeo-data/rechunker

Rechunker 0.3.3 is incompatible with Dask 2022.01.1 and later

tomwhite opened this issue ยท 7 comments

Running the rechunker 0.3.3 unit tests with Dask 2022.2.0 gives lots of failures like the following.

It seems to have been introduced with this HLG change in Dask, released in Dask 2022.01.1.

This is a problem since we are stuck on rechunker 0.3.3 due to #92 (see also https://github.com/pystatgen/sgkit/issues/820).

________________________________________________ test_rechunk_dataset[mapper.temp.zarr-mapper.target.zarr-dask-10MB-target_chunks1-source_chunks0-shape0] ________________________________________________

tmp_path = PosixPath('/private/var/folders/cj/wyp4zgw17vj4m9qdmddvmcc80000gn/T/pytest-of-tom/pytest-1730/test_rechunk_dataset_mapper_te3'), shape = (100, 50), source_chunks = (10, 50)
target_chunks = {'a': {'x': 20, 'y': 10}, 'b': {'x': 20}}, max_mem = '10MB', executor = 'dask', target_store = <fsspec.mapping.FSMap object at 0x7fb48ae17890>
temp_store = <fsspec.mapping.FSMap object at 0x7fb48ae17d50>

    @pytest.mark.parametrize("shape", [(100, 50)])
    @pytest.mark.parametrize("source_chunks", [(10, 50)])
    @pytest.mark.parametrize(
        "target_chunks",
        [{"a": (20, 10), "b": (20,)}, {"a": {"x": 20, "y": 10}, "b": {"x": 20}}],
    )
    @pytest.mark.parametrize("max_mem", ["10MB"])
    @pytest.mark.parametrize("executor", ["dask"])
    @pytest.mark.parametrize("target_store", ["target.zarr", "mapper.target.zarr"])
    @pytest.mark.parametrize("temp_store", ["temp.zarr", "mapper.temp.zarr"])
    def test_rechunk_dataset(
        tmp_path,
        shape,
        source_chunks,
        target_chunks,
        max_mem,
        executor,
        target_store,
        temp_store,
    ):
        if target_store.startswith("mapper"):
            target_store = fsspec.get_mapper(str(tmp_path) + target_store)
            temp_store = fsspec.get_mapper(str(tmp_path) + temp_store)
        else:
            target_store = str(tmp_path / target_store)
            temp_store = str(tmp_path / temp_store)
    
        a = numpy.arange(numpy.prod(shape)).reshape(shape).astype("f4")
        a[-1] = numpy.nan
        ds = xarray.Dataset(
            dict(
                a=xarray.DataArray(
                    a, dims=["x", "y"], attrs={"a1": 1, "a2": [1, 2, 3], "a3": "x"}
                ),
                b=xarray.DataArray(numpy.ones(shape[0]), dims=["x"]),
                c=xarray.DataArray(numpy.ones(shape[1]), dims=["y"]),
            ),
            coords=dict(
                cx=xarray.DataArray(numpy.ones(shape[0]), dims=["x"]),
                cy=xarray.DataArray(numpy.ones(shape[1]), dims=["y"]),
            ),
            attrs={"a1": 1, "a2": [1, 2, 3], "a3": "x"},
        )
        ds = ds.chunk(chunks=dict(zip(["x", "y"], source_chunks)))
        options = dict(
            a=dict(
                compressor=zarr.Blosc(cname="zstd"),
                dtype="int32",
                scale_factor=0.1,
                _FillValue=-9999,
            )
        )
        rechunked = api.rechunk(
            ds,
            target_chunks=target_chunks,
            max_mem=max_mem,
            target_store=target_store,
            target_options=options,
            temp_store=temp_store,
>           executor=executor,
        )

tests/test_rechunk.py:105: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
rechunker/api.py:305: in rechunk
    plan = executor.prepare_plan(copy_spec)
rechunker/executors/dask.py:21: in prepare_plan
    return _copy_all(specs)
rechunker/executors/dask.py:96: in _copy_all
    stores_delayed = [_chunked_array_copy(spec) for spec in specs]
rechunker/executors/dask.py:96: in <listcomp>
    stores_delayed = [_chunked_array_copy(spec) for spec in specs]
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

spec = CopySpec(read=ArrayProxy(array=dask.array<astype, shape=(100, 50), dtype=int32, chunksize=(10, 50), chunktype=numpy.nd...' (100, 50) int32>, chunks=(10, 50)), write=ArrayProxy(array=<zarr.core.Array '/a' (100, 50) int32>, chunks=(100, 50)))

    def _chunked_array_copy(spec: CopySpec) -> Delayed:
        """Chunked copy between arrays."""
        if spec.intermediate.array is None:
            target_store_delayed = _direct_array_copy(
                spec.read.array, spec.write.array, spec.read.chunks,
            )
    
            # fuse
            target_dsk = dask.utils.ensure_dict(target_store_delayed.dask)
            dsk_fused, _ = fuse(target_dsk)
    
            return Delayed(target_store_delayed.key, dsk_fused)
    
        else:
            # do intermediate store
            int_store_delayed = _direct_array_copy(
                spec.read.array, spec.intermediate.array, spec.read.chunks,
            )
            target_store_delayed = _direct_array_copy(
                spec.intermediate.array, spec.write.array, spec.write.chunks,
            )
    
            # now do some hacking to chain these together into a single graph.
            # get the two graphs as dicts
            int_dsk = dask.utils.ensure_dict(int_store_delayed.dask)
            target_dsk = dask.utils.ensure_dict(target_store_delayed.dask)
    
            # find the root store key representing the read
            root_keys = []
            for key in target_dsk:
                if isinstance(key, str):
                    if key.startswith("from-zarr"):
                        root_keys.append(key)
>           assert len(root_keys) == 1
E           AssertionError

rechunker/executors/dask.py:74: AssertionError

Thanks for reporting Tom. This feels vaguely similar to pangeo-forge/pangeo-forge-recipes#259, which we successfully fixed.

In pangeo-forge-recipes, we have copied (and improved upon) the executor framework we are using in rechunker. So fixing this will likely involve bringing rechunker up to speed with those changes.

Chiming in to say that I am getting this same AssertionError using Dask 2022.2.1, but did not get the error using Dask 2022.1.1. I was running Rechunker version 0.3.3 just fine on Pangeo Cloud before the recent docker image update switched from Dask 2022.1.1 to 2022.2.1.

Hopefully fixed by #112.

@tomwhite is going to track down some example code from @eric-czech to ensure this fix works for us.

PR #112 seems to have fixed my issues! Thanks a bunch @rabernat!

Thanks @hammer! I've heard from a few folks already, and I'm feeling pretty confident that this issue is fixed. But would love as much feedback as possible. I will probably make a release early next week.

0.5.0 has been released.