pydata/xarray

Expose xarray's h5py serialization capabilites as public API?

rabernat opened this issue · 5 comments

Xarray has a magic ability to serialize h5py datasets. We should expose this somehow and allow it to be used outside of xarray.

Consider the following example:

import s3fs
import h5py
import dask.array as dsa
import xarray as xr
import cloudpickle

url = 'noaa-goes16/ABI-L2-RRQPEF/2020/001/00/OR_ABI-L2-RRQPEF-M6_G16_s20200010000216_e20200010009524_c20200010010034.nc'
fs = s3fs.S3FileSystem(anon=True)
f = fs.open(url)
ds = h5py.File(f, mode='r')
data = dsa.from_array(ds['RRQPE'])
_ = cloudpickle.dumps(data)

This raises TypeError: h5py objects cannot be pickled.

However, if I read the file with xarray...

ds = xr.open_dataset(f, chunks={})
data = ds['RRQPE'].data
_ = cloudpickle.dumps(data)

It works just fine. This has come up in several places (e.g. fsspec/s3fs#337, dask/distributed#2787).

It seems like the ability to pickle these arrays is broadly useful, beyond xarray.

  1. How does our magic work?
  2. What would it look like to break this magic out and expose it as public API (or inside another package)

The secret is our CachingFileManager object:
https://github.com/pydata/xarray/blob/b1c7e315e8a18e86c5751a0aa9024d41a42ca5e8/xarray/backends/file_manager.py

This class is essential to writing a performant / serializable backend for xarray/dask, so we definitely should expose it as public API as part of our backends refactor. I would not object to breaking it out into another package if someone is interested enough to make that happen.

stale commented

In order to maintain a list of currently relevant issues, we mark issues as stale after a period of inactivity

If this issue remains relevant, please comment here or remove the stale label; otherwise it will be marked as closed automatically

FWIW this sounds similar to what h5pickle does. Maybe it is worth improving that package with whatever logic Xarray has?

I wound be extremely interested in, at the minimum, documenting this publicly. For example, this is what I think should work based on looking at the code

import dask.array as da
import dask.distributed as dd
import h5py
import numpy as np
from xarray.backends import CachingFileManager

X = np.random.randn(100, 100)
manager = CachingFileManager(h5py.File, 'data.h5', mode='w')
manager.acquire().create_dataset("X", (100,100), 'f', data=X)
with dd.LocalCluster(n_workers=1,threads_per_worker=1) as cluster:
    with dd.Client(cluster) as client:
        with manager.acquire_context(needs_lock=False) as root: # or True
            X_h5 = root['X']
            da.from_array(X_h5)[...].compute()

Or something along these lines but I am not sure really how to use CachingFileManager properly so don't know. I would like to be able to use lazily loaded hdf5 files with dask distributed.

Thanks!

Update:

import dask.array as da
import dask.distributed as dd
import h5py
import numpy as np
from xarray.backends import CachingFileManager

chunksize = 10
numchunks = 10
size = (chunksize * numchunks, chunksize * numchunks)
X = np.random.randn(*size)
h5py.File('data.h5', 'w').create_dataset("X", size, 'f', data=X)
manager = CachingFileManager(h5py.File, 'data.h5', mode='r')
def get_chunk(block_id=None):
    with manager.acquire_context(needs_lock=False) as f:
        x = block_id[0] * chunksize
        y = block_id[1] * chunksize
        chunk = f['X'][x: x + chunksize, y : y + chunksize]
    return chunk
with dd.LocalCluster(n_workers=1,threads_per_worker=1) as cluster:
    with dd.Client(cluster) as client:
        res = da.map_blocks(get_chunk, chunks=((chunksize, )* numchunks, (chunksize,) * numchunks), meta=np.array([]), dtype=X.dtype)[...].compute()
print(res)

works