xarray-contrib/cf-xarray

`cf-xarray` conflict with `xpublish` and dask?

kthyng opened this issue · 3 comments

I think I am hitting a problem with cf-xarray in combination with dask, though my usage is through model output served through xpublish.

In the following example, the compute call hits an error, but if you remove the .cf then it doesn't. Note that this would require a custom vocabulary to identify "temp" in the dataset but the variable is already named "temp" so that shouldn't be an issue. But regardless it also doesn't work when used with a custom vocabulary (though I can add in some code for that if it would help).

Any ideas? Thanks.

import intake
import cf_xarray

dsm = intake.open_zarr("http://xpublish-ciofs.srv.axds.co/datasets/ciofs_hindcast/zarr/", consolidated=True).to_dask()

Targ = slice('2000-1-1', '2000-1-2', None)
ixi, ieta = 187, 14

dsm.cf["temp"].sel(ocean_time=Targ).isel(xi_rho=ixi, eta_rho=ieta, s_rho=-1).compute()

Error:

---------------------------------------------------------------------------
ClientResponseError                       Traceback (most recent call last)
Cell In[17], line 9
      6 Targ = slice('2000-1-1', '2000-1-2', None)
      7 ixi, ieta = 187, 14
----> 9 dsm.cf["temp"].sel(ocean_time=Targ).isel(xi_rho=ixi, eta_rho=ieta, s_rho=-1).compute()

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/xarray/core/dataarray.py:1089, in DataArray.compute(self, **kwargs)
   1070 """Manually trigger loading of this array's data from disk or a
   1071 remote source into memory and return a new array. The original is
   1072 left unaltered.
   (...)
   1086 dask.compute
   1087 """
   1088 new = self.copy(deep=False)
-> 1089 return new.load(**kwargs)

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/xarray/core/dataarray.py:1063, in DataArray.load(self, **kwargs)
   1045 def load(self: T_DataArray, **kwargs) -> T_DataArray:
   1046     """Manually trigger loading of this array's data from disk or a
   1047     remote source into memory and return this array.
   1048 
   (...)
   1061     dask.compute
   1062     """
-> 1063     ds = self._to_temp_dataset().load(**kwargs)
   1064     new = self._from_temp_dataset(ds)
   1065     self._variable = new._variable

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/xarray/core/dataset.py:754, in Dataset.load(self, **kwargs)
    752 for k, v in self.variables.items():
    753     if k not in lazy_data:
--> 754         v.load()
    756 return self

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/xarray/core/variable.py:532, in Variable.load(self, **kwargs)
    530     self._data = as_compatible_data(self._data.compute(**kwargs))
    531 elif not is_duck_array(self._data):
--> 532     self._data = np.asarray(self._data)
    533 return self

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/xarray/core/indexing.py:623, in CopyOnWriteArray.__array__(self, dtype)
    622 def __array__(self, dtype=None):
--> 623     return np.asarray(self.array, dtype=dtype)

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/xarray/core/indexing.py:524, in LazilyIndexedArray.__array__(self, dtype)
    522 def __array__(self, dtype=None):
    523     array = as_indexable(self.array)
--> 524     return np.asarray(array[self.key], dtype=None)

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/xarray/backends/zarr.py:73, in ZarrArrayWrapper.__getitem__(self, key)
     71 array = self.get_array()
     72 if isinstance(key, indexing.BasicIndexer):
---> 73     return array[key.tuple]
     74 elif isinstance(key, indexing.VectorizedIndexer):
     75     return array.vindex[
     76         indexing._arrayize_vectorized_indexer(key, self.shape).tuple
     77     ]

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/zarr/core.py:821, in Array.__getitem__(self, selection)
    819     result = self.vindex[selection]
    820 else:
--> 821     result = self.get_basic_selection(pure_selection, fields=fields)
    822 return result

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/zarr/core.py:944, in Array.get_basic_selection(self, selection, out, fields)
    942 # handle zero-dimensional arrays
    943 if self._shape == ():
--> 944     return self._get_basic_selection_zd(selection=selection, out=out,
    945                                         fields=fields)
    946 else:
    947     return self._get_basic_selection_nd(selection=selection, out=out,
    948                                         fields=fields)

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/zarr/core.py:961, in Array._get_basic_selection_zd(self, selection, out, fields)
    958 try:
    959     # obtain encoded data for chunk
    960     ckey = self._chunk_key((0,))
--> 961     cdata = self.chunk_store[ckey]
    963 except KeyError:
    964     # chunk not initialized
    965     chunk = np.zeros_like(self._meta_array, shape=(), dtype=self._dtype)

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/zarr/storage.py:1393, in FSStore.__getitem__(self, key)
   1391 key = self._normalize_key(key)
   1392 try:
-> 1393     return self.map[key]
   1394 except self.exceptions as e:
   1395     raise KeyError(key) from e

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/fsspec/mapping.py:143, in FSMap.__getitem__(self, key, default)
    141 k = self._key_to_str(key)
    142 try:
--> 143     result = self.fs.cat(k)
    144 except self.missing_exceptions:
    145     if default is not None:

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/fsspec/asyn.py:115, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
    112 @functools.wraps(func)
    113 def wrapper(*args, **kwargs):
    114     self = obj or args[0]
--> 115     return sync(self.loop, func, *args, **kwargs)

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/fsspec/asyn.py:100, in sync(loop, func, timeout, *args, **kwargs)
     98     raise FSTimeoutError from return_result
     99 elif isinstance(return_result, BaseException):
--> 100     raise return_result
    101 else:
    102     return return_result

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/fsspec/asyn.py:55, in _runner(event, coro, result, timeout)
     53     coro = asyncio.wait_for(coro, timeout=timeout)
     54 try:
---> 55     result[0] = await coro
     56 except Exception as ex:
     57     result[0] = ex

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/fsspec/asyn.py:414, in AsyncFileSystem._cat(self, path, recursive, on_error, batch_size, **kwargs)
    412     ex = next(filter(is_exception, out), False)
    413     if ex:
--> 414         raise ex
    415 if (
    416     len(paths) > 1
    417     or isinstance(path, list)
    418     or paths[0] != self._strip_protocol(path)
    419 ):
    420     return {
    421         k: v
    422         for k, v in zip(paths, out)
    423         if on_error != "omit" or not is_exception(v)
    424     }

File ~/miniconda3/envs/ciofs/lib/python3.10/asyncio/tasks.py:408, in wait_for(fut, timeout)
    405 loop = events.get_running_loop()
    407 if timeout is None:
--> 408     return await fut
    410 if timeout <= 0:
    411     fut = ensure_future(fut, loop=loop)

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/fsspec/implementations/http.py:231, in HTTPFileSystem._cat_file(self, url, start, end, **kwargs)
    229 async with session.get(self.encode_url(url), **kw) as r:
    230     out = await r.read()
--> 231     self._raise_not_found_for_status(r, url)
    232 return out

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/fsspec/implementations/http.py:214, in HTTPFileSystem._raise_not_found_for_status(self, response, url)
    212 if response.status == 404:
    213     raise FileNotFoundError(url)
--> 214 response.raise_for_status()

File ~/miniconda3/envs/ciofs/lib/python3.10/site-packages/aiohttp/client_reqrep.py:1005, in ClientResponse.raise_for_status(self)
   1003 assert self.reason is not None
   1004 self.release()
-> 1005 raise ClientResponseError(
   1006     self.request_info,
   1007     self.history,
   1008     status=self.status,
   1009     message=self.reason,
   1010     headers=self.headers,
   1011 )

ClientResponseError: 500, message='Internal Server Error', url=URL('http://xpublish-ciofs.srv.axds.co/datasets/ciofs_hindcast/zarr/grid/0')
</details>

Works for me. This may just be a flaky server

Closing since this doesn't seem like a cf-xarray bug, we just rewrite to a different xarray call

@dcherian I don't know how this worked for you but we were able to get a fix by changing the server set up some, and there is also a change out to xpublish that will help. But yes, I think not cf-xarray per se. Thanks!