Xarray-to-Zarr recipe runs out of memory
chuckwondo opened this issue ยท 15 comments
I'm following the PangeoForge recipe tutorial Xarray-to-Zarr Sequential Recipe: NOAA OISST to create a recipe for CEDA monthly daytime land surface temperature data, but I'm running into issues using pangeo-forge-recipes version 0.10.0 (obtained via conda-forge).
Here's my code in recipe.py
(I'm using python 3.11):
import os
from tempfile import TemporaryDirectory
import apache_beam as beam
import pandas as pd
import xarray as xr
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
OpenURLWithFSSpec,
OpenWithXarray,
StoreToZarr,
)
url_pattern = (
"https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
"MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
"ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
)
months = pd.date_range("1995-08", "2020-12", freq=pd.offsets.MonthBegin())
urls = tuple(url_pattern.format(time=month) for month in months)
# Prune to 1 element to minimize memory reqs for now
pattern = pattern_from_file_sequence(urls, "time", nitems_per_file=1).prune(1)
temp_dir = TemporaryDirectory()
target_root = temp_dir.name
store_name = "output.zarr"
target_store = os.path.join(target_root, store_name)
transforms = (
beam.Create(pattern.items())
| OpenURLWithFSSpec()
| OpenWithXarray(file_type=pattern.file_type)
| StoreToZarr(
target_root=target_root,
store_name=store_name,
combine_dims=pattern.combine_dim_keys,
target_chunks={"time": 1, "lat": 5, "lon": 5},
)
)
print(f"{pattern=}")
print(f"{target_store=}")
print(f"{transforms=}")
with beam.Pipeline() as p:
p | transforms # type: ignore[reportUnusedExpression]
with xr.open_zarr(target_store) as ds:
print(ds)
When I run this, it is eventually killed because it consumes an obscene amount of memory. I saw the python process exceed 40G of memory (on my 16G machine), but it may very well have gone beyond that while I wasn't watching it -- it ran for about 3.5 hours!:
$ time python recipe.py
pattern=<FilePattern {'time': 1}>
target_store='/var/folders/v_/q9ql2x2n3dlg2td_b6xkcjzw0000gn/T/tmpozgcr3ng/output.zarr'
transforms=<_ChainedPTransform(PTransform) label=[Create|OpenURLWithFSSpec|OpenWithXarray|StoreToZarr] at 0x162819b90>
...
.../python3.11/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
Killed: 9
real 216m31.108s
user 76m14.794s
sys 90m21.965s
.../python3.11/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown
warnings.warn('resource_tracker: There appear to be %d '
I'm going to downgrade pangeo-forge-recipes to a version prior to the recently introduced breaking API changes to see if I encounter the same problem with the old API, but in the meantime, is there anything glaringly wrong with what I've written above that would cause the memory issue?
As an example of one of the files:
>>> import xarray as xr
>>> from datetime import date
>>> url_pattern = (
... "https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
... "MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
... "ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
... )
>>> url = url_pattern.format(time=date(1995, 8, 1))
>>> url
'https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/1995/08/ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-19950801000000-fv2.00.nc'
>>> ds = xr.open_dataset(f"{url}#mode=bytes", chunks="auto")
>>> ds
<xarray.Dataset>
Dimensions: (time: 1, lat: 18000, lon: 36000, length_scale: 1,
channel: 2)
Coordinates:
* time (time) datetime64[ns] 1995-08-01
* lat (lat) float32 -90.0 -89.99 -89.98 ... 89.97 89.98 89.99
* lon (lon) float32 -180.0 -180.0 -180.0 ... 180.0 180.0 180.0
* channel (channel) float32 11.0 12.0
Dimensions without coordinates: length_scale
Data variables: (12/14)
dtime (time, lat, lon) timedelta64[ns] dask.array<chunksize=(1, 2895, 5794), meta=np.ndarray>
satze (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
sataz (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
solze (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
solaz (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
lst (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
... ...
lst_unc_loc_atm (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
lst_unc_loc_sfc (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
lst_unc_sys (length_scale) float32 dask.array<chunksize=(1,), meta=np.ndarray>
lcc (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
n (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
lst_unc_loc_cor (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
Attributes: (12/41)
source: ESA LST CCI IRCDR L3S V2.00
title: ESA LST CCI land surface temperature time ser...
institution: University of Leicester
history: Created using software developed at Universit...
references: https://climate.esa.int/en/projects/land-surf...
Conventions: CF-1.8
... ...
geospatial_lon_resolution: 0.01
geospatial_lat_resolution: 0.01
key_variables: land_surface_temperature
format_version: CCI Data Standards v2.2
spatial_resolution: 0.01 degree
doi: 10.5285/785ef9d3965442669bff899540747e28
>>>
To possibly bypass the memory issue, I've attempted to use kerchunk instead, based upon the HDF Reference Recipe for CMIP6, but also to no avail:
import os
from tempfile import TemporaryDirectory
import apache_beam as beam
import pandas as pd
import xarray as xr
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
CombineReferences,
OpenWithKerchunk,
WriteCombinedReference,
)
url_pattern = (
"https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
"MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
"ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
)
months = pd.date_range("1995-08", "2020-12", freq=pd.offsets.MonthBegin())
urls = tuple(url_pattern.format(time=month) for month in months)
pattern = pattern_from_file_sequence(urls, "time", nitems_per_file=1).prune(2)
temp_dir = TemporaryDirectory()
target_root = temp_dir.name
store_name = "ceda-monthly-daytime-lst"
target_store = os.path.join(target_root, store_name)
os.mkdir(target_store)
transforms = (
beam.Create(pattern.items())
| OpenWithKerchunk(file_type=pattern.file_type)
| CombineReferences(
# Same error, regardless of inclusion/exclusion of "length_scale"
# concat_dims=["time"], identical_dims=["lat", "lon", "channel", "length_scale"]
concat_dims=["time"], identical_dims=["lat", "lon", "channel"]
)
| WriteCombinedReference(target_root=target_root, store_name=store_name)
)
print(f"{pattern=}")
print(f"{target_store=} (exists: {os.path.isdir(target_store)})")
print(f"{transforms=}")
with beam.Pipeline() as p:
p | transforms # type: ignore[reportUnusedExpression]
with xr.open_zarr(target_store) as ds:
print(ds)
Running this version results in the following, so I don't know if there's a way to eliminate the problematic length_scale
coordinate that lst_unc_sys
uses. Also, I don't understand why I'm seeing the UserWarning about the time coord:
$ time python recipe-kerchunk.py
pattern=<FilePattern {'time': 2}>
target_store='/var/folders/v_/q9ql2x2n3dlg2td_b6xkcjzw0000gn/T/tmpdt_pu6g5/ceda-monthly-daytime-lst' (exists: True)
transforms=<_ChainedPTransform(PTransform) label=[Create|OpenWithKerchunk|CombineReferences|WriteCombinedReference] at 0x10e458510>
.../python3.11/site-packages/kerchunk/combine.py:269: UserWarning: Concatenated coordinate 'time' contains less than expectednumber of values across the datasets: [4.6008e+08]
...
.../python3.11/site-packages/kerchunk/combine.py", line 403, in second_pass
raise ValueError(
ValueError: Found chunk size mismatch:
at prefix lst_unc_sys in iteration 1 (file None)
new chunk: [1, 1]
chunks so far: [1]
real 12m39.490s
user 0m53.177s
sys 0m15.788s
@chuckwondo thanks for using Pangeo Forge and reporting back on your experience. There are definitely still some unpolished edges and these types of reports are invaluable for surfacing areas we need to work on more.
On first glance, what comes to mind for me is that when you call
with beam.Pipeline() as p:
p | transforms
Beam is presumably selecting the default local multithreaded runner (because no other runner is identified) which then attempts to fetch and open as many of your source files as possible in concurrent threads, which could easily end up being (far) in excess of 40GB depending on how big each source file is and how many of them there are. (Even if you have a modest number of threads, it's possible Beam is also attempting to dump opened input files into a serialized in-memory cache as it waits for the rest of the inputs to be opened.)
So a question for you: what is the aggregate size you expect the final zarr store to be? If < 40GB, then this does sound like a memory leak of some sort. If => 40GB, then this might be somewhat expected behavior for beam. In the latter case, we should figure out what distributed runner (Flink, GCP Dataflow, etc.) may make sense for this job. Generally speaking, for any large "production" build, we would recommend using a distributed runner, as the local runners are not built for reliably moving large amounts of data. Apologies that this is not currently clearer in the docs. I'm currently working on a docs revision which makes that more apparent.
@cisaacstern, thanks for the reply.
In the version of the code in my initial description, I've pruned the pattern to only 1 file:
# Prune to 1 element to minimize memory reqs for now
pattern = pattern_from_file_sequence(urls, "time", nitems_per_file=1).prune(1)
This is confirmed by the initial line of output:
pattern=<FilePattern {'time': 1}>
While each file is rather larger (~1.5G), I wouldn't expect operating on only 1 file to consume such an incredible amount of memory (40G+, perhaps even far more, while I wasn't looking).
Regarding your question about how large I expect the final zarr file to be, I'm not particularly familiar with the format to know how the size of the input files (1 file in this case) would translate into the size of the final zarr file.
However, I wouldn't expect processing a single 1.5G file to cause things to collapse even on a local runner, but I don't know what's happening under the covers.
I specifically pruned the pattern to 1 file (I originally used 2, which caused my original memory issue, so I pruned to only 1 to see if that would help, to no avail) to limit memory consumption as much as possible, just to confirm that I could produce sensible output before potentially submitting the recipe to the staged-recipes repo (if that's even the correct course of action), where I imagine it would be executed on a sufficiently large platform to process all of the files.
Do you have any further insights, or any suggestions?
I have reproduced this issue. Now investigating.
This is the problem
target_chunks={"time": 1, "lat": 5, "lon": 5},
You are trying to decimate this very large dataset into a tiny, tiny number of chunks. This is overwhelming beam with a vast number of tasks.
The original dataset dimensions are {'time': 1, 'lat': 18000, 'lon': 36000, 'length_scale': 1, 'channel': 2}
. So this pipeline is creating 25920000 different tasks (one for each chunk).
Beyond the inefficient pipeline, this Zarr with these tiny chunks would be extremely hard to use. We generally aim for chunks from 1-100 MB. The following chunks seemed to work for me.
target_chunks={"lat": 1800, "lon": 1800},
However, then I hit a new error
IndexError: too many indices for array; expected 1, got 4
which I swear I have seen before but can't remember where.
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/pandas/core/arrays/timedeltas.py:908: RuntimeWarning: invalid value encountered in cast
base = data.astype(np.int64)
/srv/conda/envs/notebook/lib/python3.10/site-packages/pandas/core/arrays/timedeltas.py:912: RuntimeWarning: invalid value encountered in cast
data = (base * m + (frac * m).astype(np.int64)).view("timedelta64[ns]")
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/coding/times.py:618: RuntimeWarning: invalid value encountered in cast
int_num = np.asarray(num, dtype=np.int64)
Unexpected exception formatting exception. Falling back to standard exception
Traceback (most recent call last):
File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 839, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 985, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/transforms/core.py", line -1, in <lambda>
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 90, in store_dataset_fragment
_store_data(vname, da.variable, index, zgroup)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 50, in _store_data
zarr_array[region] = data
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1497, in __setitem__
self.set_basic_selection(pure_selection, value, fields=fields)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1593, in set_basic_selection
return self._set_basic_selection_nd(selection, value, fields=fields)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1981, in _set_basic_selection_nd
indexer = BasicIndexer(selection, self)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 335, in __init__
selection = replace_ellipsis(selection, array._shape)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 272, in replace_ellipsis
check_selection_length(selection, shape)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 236, in check_selection_length
err_too_many_indices(selection, shape)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/errors.py", line 70, in err_too_many_indices
raise IndexError(
IndexError: too many indices for array; expected 1, got 4
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 3508, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "/tmp/ipykernel_1951/4238254113.py", line 1, in <module>
with beam.Pipeline() as p:
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/pipeline.py", line 600, in __exit__
self.result = self.run()
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/pipeline.py", line 577, in run
return self.runner.run_pipeline(self, self._options)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 128, in run_pipeline
return runner.run_pipeline(pipeline, options)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 202, in run_pipeline
self._latest_run_result = self.run_via_runner_api(
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api
return self.run_stages(stage_context, stages)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 455, in run_stages
bundle_results = self._execute_bundle(
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 783, in _execute_bundle
self._run_bundle(
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1020, in _run_bundle
result, splits = bundle_manager.process_bundle(
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1356, in process_bundle
result_future = self._worker_handler.control_conn.push(process_bundle_req)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
response = self.worker.do_instruction(request)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 625, in do_instruction
return getattr(self, request_type)(
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 663, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1051, in process_bundle
input_op_by_transform_id[element.transform_id].process_encoded(
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 232, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 568, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 570, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 261, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1513, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1607, in apache_beam.runners.common._OutputHandler.handle_process_outputs
File "apache_beam/runners/common.py", line 1720, in apache_beam.runners.common._OutputHandler._write_value_to_tag
File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 1533, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 839, in apache_beam.runners.common.PerWindowInvoker.invoke_process
File "apache_beam/runners/common.py", line 985, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/transforms/core.py", line -1, in <lambda>
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 90, in store_dataset_fragment
_store_data(vname, da.variable, index, zgroup)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 50, in _store_data
zarr_array[region] = data
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1497, in __setitem__
self.set_basic_selection(pure_selection, value, fields=fields)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1593, in set_basic_selection
return self._set_basic_selection_nd(selection, value, fields=fields)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1981, in _set_basic_selection_nd
indexer = BasicIndexer(selection, self)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 335, in __init__
selection = replace_ellipsis(selection, array._shape)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 272, in replace_ellipsis
check_selection_length(selection, shape)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 236, in check_selection_length
err_too_many_indices(selection, shape)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/errors.py", line 70, in err_too_many_indices
raise IndexError(
IndexError: too many indices for array; expected 1, got 4 [while running '[23]: Create|OpenURLWithFSSpec|OpenWithXarray|StoreToZarr/StoreToZarr/StoreDatasetFragments/Map(store_dataset_fragment)']
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 2105, in showtraceback
stb = self.InteractiveTB.structured_traceback(
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1428, in structured_traceback
return FormattedTB.structured_traceback(
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1319, in structured_traceback
return VerboseTB.structured_traceback(
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1172, in structured_traceback
formatted_exception = self.format_exception_as_a_whole(etype, evalue, etb, number_of_lines_of_context,
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1087, in format_exception_as_a_whole
frames.append(self.format_record(record))
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 969, in format_record
frame_info.lines, Colors, self.has_colors, lvals
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 792, in lines
return self._sd.lines
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/utils.py", line 144, in cached_property_wrapper
value = obj.__dict__[self.func.__name__] = self.func(obj)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/core.py", line 734, in lines
pieces = self.included_pieces
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/utils.py", line 144, in cached_property_wrapper
value = obj.__dict__[self.func.__name__] = self.func(obj)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/core.py", line 681, in included_pieces
pos = scope_pieces.index(self.executing_piece)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/utils.py", line 144, in cached_property_wrapper
value = obj.__dict__[self.func.__name__] = self.func(obj)
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/core.py", line 660, in executing_piece
return only(
File "/srv/conda/envs/notebook/lib/python3.10/site-packages/executing/executing.py", line 190, in only
raise NotOneValueFound('Expected one value, found 0')
executing.executing.NotOneValueFound: Expected one value, found 0
Ok, I got this pipeline working. Annotated changes below.
# set up a cache location
cache_dir = TemporaryDirectory()
cache_root = cache_dir.name
transforms = (
beam.Create(pattern.items())
# Caching the file means that we don't have to load each of the 200 chunks over the network
# (which in this case is a transatlantic hop)
# Instead we cache it to local file storage
# (or could use an in-region object store for a distributed pipeline)
| OpenURLWithFSSpec(cache=cache_root)
| OpenWithXarray(
file_type=pattern.file_type,
# This variable is problematic because it doesn't have the time dimension,
# so Pangeo Forge gets confused about how to deal with it.
# The easiest thing is to drop it. Could also promote it to a coordinate variable.
xarray_open_kwargs={"drop_variables": ["lst_unc_sys"]},
)
| StoreToZarr(
target_root=target_root,
store_name=store_name,
combine_dims=pattern.combine_dim_keys,
# approx 25 MB chunks
target_chunks={"lat": 1800, "lon": 1800},
)
)
This works. It's not particularly fast in this configuration, but it should parallelize well.
Fantastic! Thank you!
Yes, at one point I had also run into that IndexError
you mentioned. I don't recall how.
I had also tried larger values for lat
and lon
chunks, but I ran into buffer overflow errors. However, I was also including a time
chunk of 1
. I hadn't tried dropping it because I had read somewhere in the docs that the library would otherwise assume enough available memory to hold all files being processed.
Are we able to drop the time
chunk because time
is the concat dim?
Also, how did you determine that your proposed target chunks are ~25MB in size (I'm still a noob at all of this)?
Are we able to drop the
time
chunk becausetime
is the concat dim?
Correct.
Also, how did you determine that your proposed target chunks are ~25MB in size (I'm still a noob at all of this)?
1800 * 1800 * 8 bytes = 25.9 MB
FWIW, I would reduce the precision of this data to float 32. You almost never need 64 bits of precision for data analytics.
Everything is float32, apart from the time values, which are datetime64[ns]. Did you use 8 bytes in the chunk size computation because you were thinking the data are float64, not float32?
I'm going to try different chunk settings (to get me nearer to ~100MB -- isn't that the "magic" number, give or take?), particularly since I can't seem to get your code suggestion to work. I keep getting either aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed
or fsspec.exceptions.FSTimeoutError
. Perhaps fiddling with the chunk size will get me past them.
Everything is float32, apart from the time values, which are datetime64[ns]. Did you use 8 bytes in the chunk size computation because you were thinking the data are float64, not float32?
Ok I see, yes this is definitely the case for the original file. For my recipe however, the target dataset was all float64. That's not good. ๐
I keep getting either
aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed
orfsspec.exceptions.FSTimeoutError
. Perhaps fiddling with the chunk size will get me past them.
Do you know if this timeout is coming from the upstream CEDA server?
Everything is float32, apart from the time values, which are datetime64[ns]. Did you use 8 bytes in the chunk size computation because you were thinking the data are float64, not float32?
Ok I see, yes this is definitely the case for the original file. For my recipe however, the target dataset was all float64. That's not good. ๐
I finally got a successful run. I used target_chunks={"lat": 3600, "lon": 7200}
and it took ~30 min. to complete.
I now see that the result is float64 that you mentioned, even though the original is float32. Is there any way for us to prevent this unwanted conversion?
I keep getting either
aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed
orfsspec.exceptions.FSTimeoutError
. Perhaps fiddling with the chunk size will get me past them.Do you know if this timeout is coming from the upstream CEDA server?
I suspect the problem may be on the CEDA server side. While using the CEDA OPeNDAP server during the course of some experimental work (which is what led me to look at writing this recipe, on the advice of @sharkinsspatial), we've experienced a great deal of flakiness.
I also see a few dozen of these 2 warnings during execution of the recipe:
.../python3.11/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
return to_zarr( # type: ignore[call-overload,misc]
and
.../python3.11/site-packages/xarray/coding/times.py:618: RuntimeWarning: invalid value encountered in cast
int_num = np.asarray(num, dtype=np.int64)
Is there any way (or need) to address these?
Here's the current, successfully running code (running locally, pruned to 2 files), when the server isn't flaking out. I also managed to get a successful local run pruned to 10 files (just barely, as I saw memory spike to over 70G!), which led me to discover that there are a couple of gaps in the monthly files, so the date range is adjusted for those gaps.
import os
from pathlib import Path
from tempfile import TemporaryDirectory
import apache_beam as beam
import pandas as pd
import xarray as xr
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
OpenURLWithFSSpec,
OpenWithXarray,
StoreToZarr,
)
# Monthly data spans from 1995-08 through 2020-12, with the exception of two
# gaps: (1) 1996-01 through 1996-06 and (2) 2001-02 through 2001-06.
months = (
pd.date_range("1995-08", "2020-12", freq=pd.offsets.MonthBegin())
.difference(pd.date_range("1996-01", "1996-06", freq=pd.offsets.MonthBegin()))
.difference(pd.date_range("2001-02", "2001-06", freq=pd.offsets.MonthBegin()))
)
url_pattern = (
"https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
"MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
"ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
"#mode=bytes"
)
urls = tuple(url_pattern.format(time=month) for month in months)
pattern = pattern_from_file_sequence(urls, "time").prune(2)
target_root = str(Path().absolute())
store_name = "output.zarr"
target_store = os.path.join(target_root, store_name)
cache_dir = TemporaryDirectory()
cache_root = cache_dir.name
transforms = (
beam.Create(pattern.items())
| OpenURLWithFSSpec(cache=cache_root)
| OpenWithXarray(
file_type=pattern.file_type,
# The lst_unc_sys var doesn't have the time dimension, which causes a
# "chunk size mismatch" error, so we must drop it.
xarray_open_kwargs={"drop_variables": ["lst_unc_sys"], "chunks": "auto"},
)
| StoreToZarr(
target_root=target_root,
store_name=store_name,
combine_dims=pattern.combine_dim_keys,
target_chunks={"lat": 1800, "lon": 3600},
)
)
with beam.Pipeline() as p:
p | transforms # type: ignore[reportUnusedExpression]
with xr.open_zarr(target_store) as ds:
print(ds)
Now my question is, what do I need to do from here to make a good submission for the staged recipes repo?
In particular, I'm wondering about the following things (and I suspect there are other considerations that I'm not aware of):
- How should I specify the target store location? I suspect this should be a cloud location, no?
- Same question for the cache directory.
- Is there any way to eliminate the warnings mentioned in my previous comment? Should I even bother?
- Is there any way to avoid the conversion from float32 to float64 mentioned in an earlier comment?
@chuckwondo, way to push this to a working state! Responses inline:
Now my question is, what do I need to do from here to make a good submission for the staged recipes repo?
The deployment of recipes from staged-recipes
is not currently maintained. Do you have access to a GCP Dataflow and/or AWS account to deploy this to? The following is a current example of using our GitHub Action to deploy to GCP Dataflow (but could be adapted for AWS): https://github.com/pangeo-forge/aqua-modis-feedstock/blob/8cb545d20d4c08e49ec176a27da1dc48d4c2191d/.github/workflows/deploy.yaml#L31-L69
Since it appears you're at DevSeed, perhaps coordinating with @sharkinsspatial on deployment infra makes sense. Sean, where have you been deploying to lately?
How should I specify the target store location? I suspect this should be a cloud location, no?
Same question for the cache directory.
As indicated in the GitHub Workflow example above, we recommend configuring this (and deploying) with the pangeo-forge-runner
CLI. The docs are lagging behind on this subject, and are currently my top priority, apologies for the lack of clarity here. One of the better reference points I can point to today is the integration tests on pangeo-forge-recipes
which (though a bit opaque, due to fixturing), do represent a fully tested use of the CLI to deploy to a local bakery:
I will aim to have better docs on this within a week or so, and will link them to you here ASAP.
Is there any way to eliminate the warnings mentioned in my previous comment? Should I even bother?
TBH, I'm not entirely clear what these mean so I'd ignore for now, assuming it doesn't appear to be affecting your output.
Is there any way to avoid the conversion from float32 to float64 mentioned in an earlier comment?
I'm not sure how to do this, if you're able to find a solution yourself, I'd be curious to know! Alternatively, could you open a dedicated Issue for this?