Expose xarray's h5py serialization capabilites as public API?
rabernat opened this issue · 5 comments
Xarray has a magic ability to serialize h5py datasets. We should expose this somehow and allow it to be used outside of xarray.
Consider the following example:
import s3fs
import h5py
import dask.array as dsa
import xarray as xr
import cloudpickle
url = 'noaa-goes16/ABI-L2-RRQPEF/2020/001/00/OR_ABI-L2-RRQPEF-M6_G16_s20200010000216_e20200010009524_c20200010010034.nc'
fs = s3fs.S3FileSystem(anon=True)
f = fs.open(url)
ds = h5py.File(f, mode='r')
data = dsa.from_array(ds['RRQPE'])
_ = cloudpickle.dumps(data)
This raises TypeError: h5py objects cannot be pickled
.
However, if I read the file with xarray...
ds = xr.open_dataset(f, chunks={})
data = ds['RRQPE'].data
_ = cloudpickle.dumps(data)
It works just fine. This has come up in several places (e.g. fsspec/s3fs#337, dask/distributed#2787).
It seems like the ability to pickle these arrays is broadly useful, beyond xarray.
- How does our magic work?
- What would it look like to break this magic out and expose it as public API (or inside another package)
The secret is our CachingFileManager
object:
https://github.com/pydata/xarray/blob/b1c7e315e8a18e86c5751a0aa9024d41a42ca5e8/xarray/backends/file_manager.py
This class is essential to writing a performant / serializable backend for xarray/dask, so we definitely should expose it as public API as part of our backends refactor. I would not object to breaking it out into another package if someone is interested enough to make that happen.
In order to maintain a list of currently relevant issues, we mark issues as stale after a period of inactivity
If this issue remains relevant, please comment here or remove the stale
label; otherwise it will be marked as closed automatically
FWIW this sounds similar to what h5pickle does. Maybe it is worth improving that package with whatever logic Xarray has?
I wound be extremely interested in, at the minimum, documenting this publicly. For example, this is what I think should work based on looking at the code
import dask.array as da
import dask.distributed as dd
import h5py
import numpy as np
from xarray.backends import CachingFileManager
X = np.random.randn(100, 100)
manager = CachingFileManager(h5py.File, 'data.h5', mode='w')
manager.acquire().create_dataset("X", (100,100), 'f', data=X)
with dd.LocalCluster(n_workers=1,threads_per_worker=1) as cluster:
with dd.Client(cluster) as client:
with manager.acquire_context(needs_lock=False) as root: # or True
X_h5 = root['X']
da.from_array(X_h5)[...].compute()
Or something along these lines but I am not sure really how to use CachingFileManager
properly so don't know. I would like to be able to use lazily loaded hdf5 files with dask distributed.
Thanks!
Update:
import dask.array as da
import dask.distributed as dd
import h5py
import numpy as np
from xarray.backends import CachingFileManager
chunksize = 10
numchunks = 10
size = (chunksize * numchunks, chunksize * numchunks)
X = np.random.randn(*size)
h5py.File('data.h5', 'w').create_dataset("X", size, 'f', data=X)
manager = CachingFileManager(h5py.File, 'data.h5', mode='r')
def get_chunk(block_id=None):
with manager.acquire_context(needs_lock=False) as f:
x = block_id[0] * chunksize
y = block_id[1] * chunksize
chunk = f['X'][x: x + chunksize, y : y + chunksize]
return chunk
with dd.LocalCluster(n_workers=1,threads_per_worker=1) as cluster:
with dd.Client(cluster) as client:
res = da.map_blocks(get_chunk, chunks=((chunksize, )* numchunks, (chunksize,) * numchunks), meta=np.array([]), dtype=X.dtype)[...].compute()
print(res)
works