microsoft/PlanetaryComputerExamples

Issue in accessing elements of data variable using Xarray with Dask Clusters

aftabhunzai opened this issue · 9 comments

I am not sure if this question needs to be ask here or no. I am trying to access elements of "re" dataset using re.variable[0,0,0].values.item(). It works perfectly without creating dask clusters, whenever I create dask clusters then it does not work: I used re.persist(), load() options but still issue not solved. I am using Planetary Computer environment (Jupyter Notebook). The error detail is as follows: I will appreciate if any help please.

[---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Input In [33], in
----> 1 da[0,0,0].values.item()

File /srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/dataarray.py:641, in DataArray.values(self)
632 @Property
633 def values(self) -> np.ndarray:
634 """
635 The array's data as a numpy.ndarray.
636
(...)
639 type does not support coercion like this (e.g. cupy).
640 """
--> 641 return self.variable.values

File /srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/variable.py:510, in Variable.values(self)
507 @Property
508 def values(self):
509 """The variable's data as a numpy.ndarray"""
--> 510 return _as_array_or_item(self._data)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/variable.py:250, in _as_array_or_item(data)
236 def _as_array_or_item(data):
237 """Return the given values as a numpy array, or as an individual item if
238 it's a 0d datetime64 or timedelta64 array.
239
(...)
248 TODO: remove this (replace with np.asarray) once these issues are fixed
249 """
--> 250 data = np.asarray(data)
251 if data.ndim == 0:
252 if data.dtype.kind == "M":

File /srv/conda/envs/notebook/lib/python3.8/site-packages/numpy/core/_asarray.py:102, in asarray(a, dtype, order, like)
99 if like is not None:
100 return _asarray_with_like(a, dtype=dtype, order=order, like=like)
--> 102 return array(a, dtype, copy=False, order=order)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/array/core.py:1541, in Array.array(self, dtype, **kwargs)
1540 def array(self, dtype=None, **kwargs):
-> 1541 x = self.compute()
1542 if dtype and x.dtype != dtype:
1543 x = x.astype(dtype)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py:288, in DaskMethodsMixin.compute(self, **kwargs)
264 def compute(self, **kwargs):
265 """Compute this dask collection
266
267 This turns a lazy Dask collection into its in-memory equivalent.
(...)
286 dask.base.compute
287 """
--> 288 (result,) = compute(self, traverse=False, **kwargs)
289 return result

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py:571, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
568 keys.append(x.dask_keys())
569 postcomputes.append(x.dask_postcompute())
--> 571 results = schedule(dsk, keys, **kwargs)
572 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/threaded.py:79, in get(dsk, result, cache, num_workers, pool, **kwargs)
76 elif isinstance(pool, multiprocessing.pool.Pool):
77 pool = MultiprocessingPoolExecutor(pool)
---> 79 results = get_async(
80 pool.submit,
81 pool._max_workers,
82 dsk,
83 result,
84 cache=cache,
85 get_id=_thread_get_id,
86 pack_exception=pack_exception,
87 **kwargs,
88 )
90 # Cleanup pools associated to dead threads
91 with pools_lock:

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py:507, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
505 _execute_task(task, data) # Re-execute locally
506 else:
--> 507 raise_exception(exc, tb)
508 res, worker_id = loads(res_info)
509 state["cache"][key] = res

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py:315, in reraise(exc, tb)
313 if exc.traceback is not tb:
314 raise exc.with_traceback(tb)
--> 315 raise exc

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/local.py:220, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
218 try:
219 task, data = loads(task_info)
--> 220 result = _execute_task(task, data)
221 id = get_id()
222 result = dumps((result, id))

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/array/chunk.py:422, in getitem(obj, index)
401 def getitem(obj, index):
402 """Getitem function
403
404 This function creates a copy of the desired selection for array-like
(...)
420
421 """
--> 422 result = obj[index]
423 try:
424 if not result.flags.owndata and obj.size >= 2 * result.size:

TypeError: 'Future' object is not subscriptable](url)

Do you have a minimal reproducible example you can share? https://matthewrocklin.com/blog/work/2018/02/28/minimal-bug-reports

Thank you for response: Here is example: Whenever you create dask clusters then can't access elements.

"""
import dask_gateway
cluster = dask_gateway.GatewayCluster()
client = cluster.get_client()
#cluster.adapt(minimum=2, maximum=100)
cluster.scale(8)
print(cluster.dashboard_link)
"""
import xarray as xr
import wget
import rioxarray as rxr
ndsi=[]
timedate=['2018-01-01']
url="https://modissa.blob.core.windows.net/modis-006/MYD10A1/23/05/2018001/MYD10A1.A2018001.h23v05.006.2018003025822.hdf"
filename="/tmp/modis/MYD10A1_23_05_2018001_MYD10A1.A2018001.h23v05.006.2018003025822.hdf"
wget.download(url,filename)

aa=rxr.open_rasterio(filename, chunks=(4, 2097152, 2097152,2097152,2097152), lock=False)
ndsi.append(aa)
geotiffs_da = xr.concat(ndsi,dim=xr.Variable('time',timedate))
geotiffs_da=geotiffs_da[['NDSI_Snow_Cover']]
images_arry=geotiffs_da.sel(band=1)
images_arry
persisted=images_arry.persist()
persisted.NDSI_Snow_Cover[0,0,0].values.item()

Thanks.

I wasn't able to reproduce your exact error. The exception I see make sense though:

---------------------------------------------------------------------------
RasterioIOError                           Traceback (most recent call last)
Input In [3], in <cell line: 17>()
     15 images_arry
     16 persisted=images_arry.persist()
---> 17 persisted.NDSI_Snow_Cover[0,0,0].values.item()

File /srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/dataarray.py:642, in DataArray.values(self)
    633 @property
    634 def values(self) -> np.ndarray:
    635     """
    636     The array's data as a numpy.ndarray.
    637 
   (...)
    640     type does not support coercion like this (e.g. cupy).
    641     """
--> 642     return self.variable.values

File /srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/variable.py:512, in Variable.values(self)
    509 @property
    510 def values(self):
    511     """The variable's data as a numpy.ndarray"""
--> 512     return _as_array_or_item(self._data)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/variable.py:252, in _as_array_or_item(data)
    238 def _as_array_or_item(data):
    239     """Return the given values as a numpy array, or as an individual item if
    240     it's a 0d datetime64 or timedelta64 array.
    241 
   (...)
    250     TODO: remove this (replace with np.asarray) once these issues are fixed
    251     """
--> 252     data = np.asarray(data)
    253     if data.ndim == 0:
    254         if data.dtype.kind == "M":

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/array/core.py:1626, in Array.__array__(self, dtype, **kwargs)
   1625 def __array__(self, dtype=None, **kwargs):
-> 1626     x = self.compute()
   1627     if dtype and x.dtype != dtype:
   1628         x = x.astype(dtype)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py:290, in DaskMethodsMixin.compute(self, **kwargs)
    266 def compute(self, **kwargs):
    267     """Compute this dask collection
    268 
    269     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    288     dask.base.compute
    289     """
--> 290     (result,) = compute(self, traverse=False, **kwargs)
    291     return result

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/base.py:573, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    570     keys.append(x.__dask_keys__())
    571     postcomputes.append(x.__dask_postcompute__())
--> 573 results = schedule(dsk, keys, **kwargs)
    574 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py:3010, in Client.get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   3008         should_rejoin = False
   3009 try:
-> 3010     results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   3011 finally:
   3012     for f in futures.values():

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py:2162, in Client.gather(self, futures, errors, direct, asynchronous)
   2160 else:
   2161     local_worker = None
-> 2162 return self.sync(
   2163     self._gather,
   2164     futures,
   2165     errors=errors,
   2166     direct=direct,
   2167     local_worker=local_worker,
   2168     asynchronous=asynchronous,
   2169 )

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py:311, in SyncMethodMixin.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    309     return future
    310 else:
--> 311     return sync(
    312         self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    313     )

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py:378, in sync(loop, func, callback_timeout, *args, **kwargs)
    376 if error:
    377     typ, exc, tb = error
--> 378     raise exc.with_traceback(tb)
    379 else:
    380     return result

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py:351, in sync.<locals>.f()
    349         future = asyncio.wait_for(future, callback_timeout)
    350     future = asyncio.ensure_future(future)
--> 351     result = yield future
    352 except Exception:
    353     error = sys.exc_info()

File /srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/client.py:2025, in Client._gather(self, futures, errors, direct, local_worker)
   2023         exc = CancelledError(key)
   2024     else:
-> 2025         raise exception.with_traceback(traceback)
   2026     raise exc
   2027 if errors == "skip":

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/optimization.py:969, in __call__()
    967 if not len(args) == len(self.inkeys):
    968     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 969 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/core.py:149, in get()
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/core.py:119, in _execute_task()
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask/array/core.py:119, in getter()
    114     # Below we special-case `np.matrix` to force a conversion to
    115     # `np.ndarray` and preserve original Dask behavior for `getter`,
    116     # as for all purposes `np.matrix` is array-like and thus
    117     # `is_arraylike` evaluates to `True` in that case.
    118     if asarray and (not is_arraylike(c) or isinstance(c, np.matrix)):
--> 119         c = np.asarray(c)
    120 finally:
    121     if lock:

File /srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py:358, in __array__()
    357 def __array__(self, dtype=None):
--> 358     return np.asarray(self.array, dtype=dtype)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py:522, in __array__()
    521 def __array__(self, dtype=None):
--> 522     return np.asarray(self.array, dtype=dtype)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py:423, in __array__()
    421 def __array__(self, dtype=None):
    422     array = as_indexable(self.array)
--> 423     return np.asarray(array[self.key], dtype=None)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/rioxarray/_io.py:301, in __getitem__()
    300 def __getitem__(self, key):
--> 301     return indexing.explicit_indexing_adapter(
    302         key, self.shape, indexing.IndexingSupport.OUTER, self._getitem
    303     )

File /srv/conda/envs/notebook/lib/python3.8/site-packages/xarray/core/indexing.py:712, in explicit_indexing_adapter()
    690 """Support explicit indexing by delegating to a raw indexing method.
    691 
    692 Outer and/or vectorized indexers are supported by indexing a second time
   (...)
    709 Indexing result, in the form of a duck numpy-array.
    710 """
    711 raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
--> 712 result = raw_indexing_method(raw_key.tuple)
    713 if numpy_indices.tuple:
    714     # index the loaded np.ndarray
    715     result = NumpyIndexingAdapter(np.asarray(result))[numpy_indices]

File /srv/conda/envs/notebook/lib/python3.8/site-packages/rioxarray/_io.py:284, in _getitem()
    282 else:
    283     with self.lock:
--> 284         riods = self.manager.acquire(needs_lock=False)
    285         if self.vrt_params is not None:
    286             riods = WarpedVRT(riods, **self.vrt_params)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/rioxarray/_io.py:117, in acquire()
    113 if self._local.thread_manager is None:
    114     self._local.thread_manager = ThreadURIManager(
    115         self._opener, *self._args, mode=self._mode, kwargs=self._kwargs
    116     )
--> 117 return self._local.thread_manager.file_handle

File /srv/conda/envs/notebook/lib/python3.8/site-packages/rioxarray/_io.py:73, in file_handle()
     71 if self._file_handle is not None:
     72     return self._file_handle
---> 73 self._file_handle = self._opener(*self._args, mode=self._mode, **self._kwargs)
     74 return self._file_handle

File /srv/conda/envs/notebook/lib/python3.8/site-packages/rasterio/env.py:437, in wrapper()
    434     session = DummySession()
    436 with env_ctor(session=session):
--> 437     return f(*args, **kwds)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/rasterio/__init__.py:220, in open()
    216 # Create dataset instances and pass the given env, which will
    217 # be taken over by the dataset's context manager if it is not
    218 # None.
    219 if mode == 'r':
--> 220     s = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
    221 elif mode == "r+":
    222     s = get_writer_for_path(path, driver=driver)(
    223         path, mode, driver=driver, sharing=sharing, **kwargs
    224     )

File rasterio/_base.pyx:263, in rasterio._base.DatasetBase.__init__()

RasterioIOError: HDF4_EOS:EOS_GRID:/tmp/modis/MYD10A1_23_05_2018001_MYD10A1.A2018001.h23v05.006.2018003025822.hdf:MOD_Grid_Snow_500m:NDSI_Snow_Cover: No such file or directory

Your Dask workers have a separate file system: they won't be able to see the file at your /tmp/modis/ folder. You should load data with dask, making sure that the download happens on the worker. Ideally, you'd load the data directly from blob storage into memory, but I'm not sure if rioxarray supports that for HDF5 files. We will have COGs available for MODIS soon.

Thanks for identifying this issue, I was guessing the same it is not mentioned in PC documentation or examples (https://microsoft.github.io/AIforEarthDataSets/data/modis.html). I tried to directly load data with dask using rioxarray but it is not supporting as I found following error: It means we can't use large modis dataset until COGs availability? when it is expected to release, any date ? any other work around? solution?
Thanks alot.
CPLE_OpenFailedError Traceback (most recent call last)
File rasterio/_base.pyx:261, in rasterio._base.DatasetBase.init()

File rasterio/_shim.pyx:78, in rasterio._shim.open_dataset()

File rasterio/_err.pyx:216, in rasterio._err.exc_wrap_pointer()

CPLE_OpenFailedError: '/vsicurl/https://modissa.blob.core.windows.net/modis-006/MYD10A1/23/05/2018001/MYD10A1.A2018001.h23v05.006.2018003025822.hdf' not recognized as a supported file format.

The COGs should be available in a couple weeks.

In the meantime, you should be able to use Dask, you'll need to make sure to download the file(s) and load them within a single task. https://docs.dask.org/en/stable/delayed.html might be helpful.

Thank you so much!

Hi Tom, in continuation of my above discussion, just to again verify, I used SAS token to read a modis file from Azure Blob directly to use Gateway Clusters. However, I got an error which is given below followed by program example code. Please guide, what is the issue I am having in my code as the modis dataset is public so I am using access_key as empty.

import rioxarray as rxr
from azure.storage.blob import BlobServiceClient
from azure.storage.blob import ResourceTypes, AccountSasPermissions, generate_account_sas
from azure.storage.blob import BlobSasPermissions
from datetime import datetime, timedelta
from azure.storage.blob import baseblobservice

def get_sas_url(modis_account_name,account_key,modis_container_name,blob_name,url):
    service = baseblobservice.BaseBlobService(account_name=modis_account_name, account_key=account_key)
    token = service.generate_blob_shared_access_signature(modis_container_name, blob_name, 
    permission=BlobSasPermissions(read=True), expiry=datetime.utcnow() + timedelta(hours=1),)
    url_with_sas = f"{url}?{token}"
    return url_with_sas

account_key=""
modis_account_name = 'modissa'
modis_container_name = 'modis-006'
blob_name="MYD10A1/23/05/2018001/MYD10A1.A2018001.h23v05.006.2018003025822.hdf"
modis_account_url = 'https://' + modis_account_name + '.blob.core.windows.net/'
url=modis_account_url+modis_container_name+'/'+blob_name
print("URL is :",url)
sas_url=get_sas_url(modis_account_name,account_key,modis_container_name,blob_name,url)
print("this is sas url:  ",sas_url)
aa=rxr.open_rasterio(sas_url, chunks=(4, 2097152, 2097152,2097152,2097152), lock=False)
print(aa)

Error is:


URL is : https://modissa.blob.core.windows.net/modis-006/MYD10A1/23/05/2018001/MYD10A1.A2018001.h23v05.006.2018003025822.hdf
https://modissa.blob.core.windows.net/modis-006/MYD10A1/23/05/2018001/MYD10A1.A2018001.h23v05.006.2018003025822.hdf?se=2022-04-21T05%3A11%3A41Z&sp=rt&sv=2018-03-28&sr=b&sig=0DknLoYQURphxsoqND6ZqKqSpKLYosrobUoVYEqN3N8%3D
this is sasurl:   https://modissa.blob.core.windows.net/modis-006/MYD10A1/23/05/2018001/MYD10A1.A2018001.h23v05.006.2018003025822.hdf?se=2022-04-21T05%3A11%3A41Z&sp=rt&sv=2018-03-28&sr=b&sig=0DknLoYQURphxsoqND6ZqKqSpKLYosrobUoVYEqN3N8%3D
---------------------------------------------------------------------------
CPLE_HttpResponseError                    Traceback (most recent call last)
File rasterio/_base.pyx:261, in rasterio._base.DatasetBase.__init__()

File rasterio/_shim.pyx:78, in rasterio._shim.open_dataset()

File rasterio/_err.pyx:216, in rasterio._err.exc_wrap_pointer()

CPLE_HttpResponseError: HTTP response code: 403

During handling of the above exception, another exception occurred:

RasterioIOError                           Traceback (most recent call last)
Input In [16], in <module>
     25 sas_url=get_sas_url(modis_account_name,account_key,modis_container_name,blob_name,url)
     27 print("this is sasurl:  ",sas_url)
---> 28 aa=rxr.open_rasterio(sas_url, chunks=(4, 2097152, 2097152,2097152,2097152), lock=False)
     29 print(aa)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/rioxarray/_io.py:833, in open_rasterio(filename, parse_coordinates, chunks, cache, lock, masked, mask_and_scale, variable, group, default_name, decode_times, decode_timedelta, **open_kwargs)
    831     else:
    832         manager = URIManager(rasterio.open, filename, mode="r", kwargs=open_kwargs)
--> 833     riods = manager.acquire()
    834     captured_warnings = rio_warnings.copy()
    836 # raise the NotGeoreferencedWarning if applicable

File /srv/conda/envs/notebook/lib/python3.8/site-packages/rioxarray/_io.py:117, in URIManager.acquire(self, needs_lock)
    113 if self._local.thread_manager is None:
    114     self._local.thread_manager = ThreadURIManager(
    115         self._opener, *self._args, mode=self._mode, kwargs=self._kwargs
    116     )
--> 117 return self._local.thread_manager.file_handle

File /srv/conda/envs/notebook/lib/python3.8/site-packages/rioxarray/_io.py:73, in ThreadURIManager.file_handle(self)
     71 if self._file_handle is not None:
     72     return self._file_handle
---> 73 self._file_handle = self._opener(*self._args, mode=self._mode, **self._kwargs)
     74 return self._file_handle

File /srv/conda/envs/notebook/lib/python3.8/site-packages/rasterio/env.py:437, in ensure_env_with_credentials.<locals>.wrapper(*args, **kwds)
    434     session = DummySession()
    436 with env_ctor(session=session):
--> 437     return f(*args, **kwds)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/rasterio/__init__.py:220, in open(fp, mode, driver, width, height, count, crs, transform, dtype, nodata, sharing, **kwargs)
    216 # Create dataset instances and pass the given env, which will
    217 # be taken over by the dataset's context manager if it is not
    218 # None.
    219 if mode == 'r':
--> 220     s = DatasetReader(path, driver=driver, sharing=sharing, **kwargs)
    221 elif mode == "r+":
    222     s = get_writer_for_path(path, driver=driver)(
    223         path, mode, driver=driver, sharing=sharing, **kwargs
    224     )

File rasterio/_base.pyx:263, in rasterio._base.DatasetBase.__init__()

RasterioIOError: HTTP response code: 403

I haven't had a chance to look into your error, but the example at https://github.com/microsoft/PlanetaryComputerExamples/blob/main/datasets/modis/modis-imagery-example.ipynb does show accessing MODIS data using our STAC API, and would work on a cluster.

That will be released to production sometime this week or next.

https://planetarycomputer.microsoft.com/dataset/modis-10A1-061#Example-Notebook should help get started with this dataset in COG format. We can reopen if you're still having issues.