pangeo-data/rechunker

Assembling lots of very small chunks

max-sixty opened this issue · 3 comments

I have a process that writes out data in very small chunks, using xr.Dataset.to_zarr(region=...). I then want to assemble the chunks into more reasonably sized chunks.

An order of magnitude on the sizes — I have a dataset of ~14TB, and the chunks are ~1MB, so we have about 14M chunks — lots of chunks...

Rechunker seems to stall in this case — it stops making progress. When I scale the problem down a bit, to 10% or even 5% of the size, it still stalls. (When I say "rechunker stalls" — it seems like the dask scheduler is predominantly responsible — it gives its usual warnings about the event loop taking [10,30,50]s with an operation — though it often gives those warnings at other times and still manages to succeed, whereas rechunker seems to make no progress).

Is this expected? Is this predominantly a dask problem, because the scheduler just can't handled task definitions with that many edges?


One approach that seems to work — but also seems so homebrew that I'm surely missing something — is to make each new chunk's work opaque to the dask scheduler:

  • Write a new array template with bigger chunks (to_zarr(compute=False))
  • Create a task to assemble the pieces of each new chunk which is "opaque" to the dask scheduler, so the central scheduler doesn't get overwhelmed. Two ways of doing this:
    • On a homegrown scheduler, which doesn't know anything about dask.
    • Distributing a task with dask delayed for each new chunk which uses scheduler="threads" within that task. Then the main dask scheduler only sees one task for each new chunk, and so does fine.
def collect_and_write_chunks(path, region):
    original = xr.open_zarr(original_path).reset_encoding()
    to_write = (
        # This stops it using the main dask scheduler
        original.drop_vars(original.indexes).isel(**region).compute(scheduler="threads")
    )
    to_write.to_zarr(path, region=region)

FWIW I have plenty of memory — it would be fine to just load it all into distributed memory and then write it out again. Just running da.chunk(x=100).to_zarr(path) seems to do better than rechunker, but much worse than the "opaque task" approach above.

da.chunk(x=100).to_zarr(path)

You could xr.open_zarr(..., chunks=FINAL_CHUNKS).to_zarr() This is a blockwise operation and should work well.

EDIT: I assumed that FINAL_CHUNKS are a multiple of the existing SMALL_CHUNKS.

You could xr.open_zarr(..., chunks=FINAL_CHUNKS).to_zarr() This is a blockwise operation and should work well.

Ah awesome, I indeed didn't know about this. That's excellent, thanks.

OK so this + rechunker is great — source=xr.open_zarr(..., chunks=INTERMEDIATE_CHUNKS), where INTERMEDIATE_CHUNKS are any multiples of the base chunks.