Assembling lots of very small chunks
max-sixty opened this issue · 3 comments
I have a process that writes out data in very small chunks, using xr.Dataset.to_zarr(region=...)
. I then want to assemble the chunks into more reasonably sized chunks.
An order of magnitude on the sizes — I have a dataset of ~14TB, and the chunks are ~1MB, so we have about 14M chunks — lots of chunks...
Rechunker seems to stall in this case — it stops making progress. When I scale the problem down a bit, to 10% or even 5% of the size, it still stalls. (When I say "rechunker stalls" — it seems like the dask scheduler is predominantly responsible — it gives its usual warnings about the event loop taking [10,30,50]s with an operation — though it often gives those warnings at other times and still manages to succeed, whereas rechunker seems to make no progress).
Is this expected? Is this predominantly a dask problem, because the scheduler just can't handled task definitions with that many edges?
One approach that seems to work — but also seems so homebrew that I'm surely missing something — is to make each new chunk's work opaque to the dask scheduler:
- Write a new array template with bigger chunks (
to_zarr(compute=False)
) - Create a task to assemble the pieces of each new chunk which is "opaque" to the dask scheduler, so the central scheduler doesn't get overwhelmed. Two ways of doing this:
- On a homegrown scheduler, which doesn't know anything about dask.
- Distributing a task with dask delayed for each new chunk which uses
scheduler="threads"
within that task. Then the main dask scheduler only sees one task for each new chunk, and so does fine.
def collect_and_write_chunks(path, region):
original = xr.open_zarr(original_path).reset_encoding()
to_write = (
# This stops it using the main dask scheduler
original.drop_vars(original.indexes).isel(**region).compute(scheduler="threads")
)
to_write.to_zarr(path, region=region)
FWIW I have plenty of memory — it would be fine to just load it all into distributed memory and then write it out again. Just running da.chunk(x=100).to_zarr(path)
seems to do better than rechunker, but much worse than the "opaque task" approach above.
da.chunk(x=100).to_zarr(path)
You could xr.open_zarr(..., chunks=FINAL_CHUNKS).to_zarr()
This is a blockwise operation and should work well.
EDIT: I assumed that FINAL_CHUNKS are a multiple of the existing SMALL_CHUNKS.
You could
xr.open_zarr(..., chunks=FINAL_CHUNKS).to_zarr()
This is a blockwise operation and should work well.
Ah awesome, I indeed didn't know about this. That's excellent, thanks.
OK so this + rechunker is great — source=xr.open_zarr(..., chunks=INTERMEDIATE_CHUNKS)
, where INTERMEDIATE_CHUNKS
are any multiples of the base chunks.