Error opening zipped tiff collection via OpenURLWithFSSpec recipe
thodson-usgs opened this issue · 2 comments
I was unable to open a collection of zipped tiffs using the OpenURLWithFSSpec recipe.
I tested the semantics with fsspec
and xarray
, and everything worked in my test notebook but failed when I built them into a pangeo-forge recipe:
SystemError: <class 'rasterio._err.CPLE_OpenFailedError'> returned a result with an exception set [while running 'Create|OpenURLWithFSSpec|OpenWithXarray|Preprocess|StoreToZarr/OpenWithXarray/Open with Xarray']
In the end, I was able to work around and open the tiffs directly with rioxarray (recipe.py); however, I believe it would be better if the recipes worked as intended.
Here's an example that will generate the error. I believe I have isolated the problem to OpenURLWithFSSpec, because avoiding OpenWithXarray will yield the same error, so the problem seems to be with the former or with xarray
.
from datetime import date
import apache_beam as beam
import pandas as pd
import xarray as xr
from pangeo_forge_recipes.patterns import ConcatDim, FilePattern
from pangeo_forge_recipes.transforms import Indexed, OpenURLWithFSSpec, OpenWithXarray, StoreToZarr, T
# note the the filepattern differs from the working example
input_url_pattern = (
'https://edcintl.cr.usgs.gov/downloads/sciweb1/shared/uswem/web/'
'conus/eta/modis_eta/daily/downloads/'
'det{yyyyjjj}.modisSSEBopETactual.zip'
)
start = date(2001, 1, 1)
end = date(2022, 10, 7)
dates = pd.date_range(start, end, freq='1D')
def make_url(time: pd.Timestamp) -> str:
return input_url_pattern.format(yyyyjjj=time.strftime('%Y%j'))
pattern = FilePattern(make_url,
ConcatDim(name='time', keys=dates, nitems_per_file=1))
pattern = pattern.prune()
class Preprocess(beam.PTransform):
"""Preprocessor transform."""
@staticmethod
def _preproc(item: Indexed[T]) -> Indexed[xr.Dataset]:
import numpy as np
index, f = item
time_dim = index.find_concat_dim('time')
time_index = index[time_dim].value
time = dates[time_index]
da = rioxarray.open_rasterio(f.open()).drop('band')
da = da.rename({'x': 'lon', 'y': 'lat'})
ds = da.to_dataset(name='aet')
ds = ds.expand_dims(time=np.array([time]))
return index, ds
def expand(self, pcoll: beam.PCollection) -> beam.PCollection:
return pcoll | beam.Map(self._preproc)
recipe = (
beam.Create(pattern.items())
| OpenURLWithFSSpec(open_kwargs={'compression': 'zip'})
| OpenWithXarray(xarray_open_kwargs={'engine': 'rasterio'})
| Preprocess()
| StoreToZarr(
store_name='us-ssebop.zarr',
target_root='.',
combine_dims=pattern.combine_dim_keys,
target_chunks={'time': 1, 'lat': int(2834 / 2), 'lon': int(6612 / 6)},
)
)
with beam.Pipeline() as p:
p | recipe
If someone picks this up, I'd also be curious what tricks they use to debug Beam.
maybe @moradology can help look into this too @thodson-usgs (as we talked about at today's meeting)