pangeo-forge/pangeo-forge-recipes

Xarray-to-Zarr recipe runs out of memory

chuckwondo opened this issue ยท 15 comments

I'm following the PangeoForge recipe tutorial Xarray-to-Zarr Sequential Recipe: NOAA OISST to create a recipe for CEDA monthly daytime land surface temperature data, but I'm running into issues using pangeo-forge-recipes version 0.10.0 (obtained via conda-forge).

Here's my code in recipe.py (I'm using python 3.11):

import os
from tempfile import TemporaryDirectory

import apache_beam as beam
import pandas as pd
import xarray as xr
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
    OpenURLWithFSSpec,
    OpenWithXarray,
    StoreToZarr,
)

url_pattern = (
    "https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
    "MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
    "ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
)
months = pd.date_range("1995-08", "2020-12", freq=pd.offsets.MonthBegin())
urls = tuple(url_pattern.format(time=month) for month in months)
# Prune to 1 element to minimize memory reqs for now
pattern = pattern_from_file_sequence(urls, "time", nitems_per_file=1).prune(1)

temp_dir = TemporaryDirectory()
target_root = temp_dir.name
store_name = "output.zarr"
target_store = os.path.join(target_root, store_name)

transforms = (
    beam.Create(pattern.items())
    | OpenURLWithFSSpec()
    | OpenWithXarray(file_type=pattern.file_type)
    | StoreToZarr(
        target_root=target_root,
        store_name=store_name,
        combine_dims=pattern.combine_dim_keys,
        target_chunks={"time": 1, "lat": 5, "lon": 5},
    )
)

print(f"{pattern=}")
print(f"{target_store=}")
print(f"{transforms=}")

with beam.Pipeline() as p:
    p | transforms  # type: ignore[reportUnusedExpression]

with xr.open_zarr(target_store) as ds:
    print(ds)

When I run this, it is eventually killed because it consumes an obscene amount of memory. I saw the python process exceed 40G of memory (on my 16G machine), but it may very well have gone beyond that while I wasn't watching it -- it ran for about 3.5 hours!:

$ time python recipe.py
pattern=<FilePattern {'time': 1}>
target_store='/var/folders/v_/q9ql2x2n3dlg2td_b6xkcjzw0000gn/T/tmpozgcr3ng/output.zarr'
transforms=<_ChainedPTransform(PTransform) label=[Create|OpenURLWithFSSpec|OpenWithXarray|StoreToZarr] at 0x162819b90>
...
.../python3.11/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
Killed: 9

real    216m31.108s
user    76m14.794s
sys     90m21.965s
.../python3.11/multiprocessing/resource_tracker.py:224: UserWarning: resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '

I'm going to downgrade pangeo-forge-recipes to a version prior to the recently introduced breaking API changes to see if I encounter the same problem with the old API, but in the meantime, is there anything glaringly wrong with what I've written above that would cause the memory issue?

As an example of one of the files:

>>> import xarray as xr
>>> from datetime import date
>>> url_pattern = (
...     "https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
...     "MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
...     "ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
... )
>>> url = url_pattern.format(time=date(1995, 8, 1))
>>> url
'https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/1995/08/ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-19950801000000-fv2.00.nc'
>>> ds = xr.open_dataset(f"{url}#mode=bytes", chunks="auto")
>>> ds
<xarray.Dataset>
Dimensions:          (time: 1, lat: 18000, lon: 36000, length_scale: 1,
                      channel: 2)
Coordinates:
  * time             (time) datetime64[ns] 1995-08-01
  * lat              (lat) float32 -90.0 -89.99 -89.98 ... 89.97 89.98 89.99
  * lon              (lon) float32 -180.0 -180.0 -180.0 ... 180.0 180.0 180.0
  * channel          (channel) float32 11.0 12.0
Dimensions without coordinates: length_scale
Data variables: (12/14)
    dtime            (time, lat, lon) timedelta64[ns] dask.array<chunksize=(1, 2895, 5794), meta=np.ndarray>
    satze            (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
    sataz            (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
    solze            (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
    solaz            (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
    lst              (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
    ...               ...
    lst_unc_loc_atm  (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
    lst_unc_loc_sfc  (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
    lst_unc_sys      (length_scale) float32 dask.array<chunksize=(1,), meta=np.ndarray>
    lcc              (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
    n                (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
    lst_unc_loc_cor  (time, lat, lon) float32 dask.array<chunksize=(1, 4094, 8195), meta=np.ndarray>
Attributes: (12/41)
    source:                     ESA LST CCI IRCDR L3S V2.00
    title:                      ESA LST CCI land surface temperature time ser...
    institution:                University of Leicester
    history:                    Created using software developed at Universit...
    references:                 https://climate.esa.int/en/projects/land-surf...
    Conventions:                CF-1.8
    ...                         ...
    geospatial_lon_resolution:  0.01
    geospatial_lat_resolution:  0.01
    key_variables:              land_surface_temperature
    format_version:             CCI Data Standards v2.2
    spatial_resolution:         0.01 degree
    doi:                        10.5285/785ef9d3965442669bff899540747e28
>>> 

To possibly bypass the memory issue, I've attempted to use kerchunk instead, based upon the HDF Reference Recipe for CMIP6, but also to no avail:

import os
from tempfile import TemporaryDirectory

import apache_beam as beam
import pandas as pd
import xarray as xr
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
    CombineReferences,
    OpenWithKerchunk,
    WriteCombinedReference,
)

url_pattern = (
    "https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
    "MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
    "ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
)
months = pd.date_range("1995-08", "2020-12", freq=pd.offsets.MonthBegin())
urls = tuple(url_pattern.format(time=month) for month in months)
pattern = pattern_from_file_sequence(urls, "time", nitems_per_file=1).prune(2)

temp_dir = TemporaryDirectory()
target_root = temp_dir.name
store_name = "ceda-monthly-daytime-lst"
target_store = os.path.join(target_root, store_name)
os.mkdir(target_store)

transforms = (
    beam.Create(pattern.items())
    | OpenWithKerchunk(file_type=pattern.file_type)
    | CombineReferences(
        # Same error, regardless of inclusion/exclusion of "length_scale"
        # concat_dims=["time"], identical_dims=["lat", "lon", "channel", "length_scale"]
        concat_dims=["time"], identical_dims=["lat", "lon", "channel"]
    )
    | WriteCombinedReference(target_root=target_root, store_name=store_name)
)

print(f"{pattern=}")
print(f"{target_store=} (exists: {os.path.isdir(target_store)})")
print(f"{transforms=}")

with beam.Pipeline() as p:
    p | transforms  # type: ignore[reportUnusedExpression]

with xr.open_zarr(target_store) as ds:
    print(ds)

Running this version results in the following, so I don't know if there's a way to eliminate the problematic length_scale coordinate that lst_unc_sys uses. Also, I don't understand why I'm seeing the UserWarning about the time coord:

$ time python recipe-kerchunk.py
pattern=<FilePattern {'time': 2}>
target_store='/var/folders/v_/q9ql2x2n3dlg2td_b6xkcjzw0000gn/T/tmpdt_pu6g5/ceda-monthly-daytime-lst' (exists: True)
transforms=<_ChainedPTransform(PTransform) label=[Create|OpenWithKerchunk|CombineReferences|WriteCombinedReference] at 0x10e458510>
.../python3.11/site-packages/kerchunk/combine.py:269: UserWarning: Concatenated coordinate 'time' contains less than expectednumber of values across the datasets: [4.6008e+08]
...
.../python3.11/site-packages/kerchunk/combine.py", line 403, in second_pass
    raise ValueError(
ValueError: Found chunk size mismatch:
                        at prefix lst_unc_sys in iteration 1 (file None)
                        new chunk: [1, 1]
                        chunks so far: [1]

real    12m39.490s
user    0m53.177s
sys     0m15.788s

@chuckwondo thanks for using Pangeo Forge and reporting back on your experience. There are definitely still some unpolished edges and these types of reports are invaluable for surfacing areas we need to work on more.

On first glance, what comes to mind for me is that when you call

with beam.Pipeline() as p:
    p | transforms

Beam is presumably selecting the default local multithreaded runner (because no other runner is identified) which then attempts to fetch and open as many of your source files as possible in concurrent threads, which could easily end up being (far) in excess of 40GB depending on how big each source file is and how many of them there are. (Even if you have a modest number of threads, it's possible Beam is also attempting to dump opened input files into a serialized in-memory cache as it waits for the rest of the inputs to be opened.)

So a question for you: what is the aggregate size you expect the final zarr store to be? If < 40GB, then this does sound like a memory leak of some sort. If => 40GB, then this might be somewhat expected behavior for beam. In the latter case, we should figure out what distributed runner (Flink, GCP Dataflow, etc.) may make sense for this job. Generally speaking, for any large "production" build, we would recommend using a distributed runner, as the local runners are not built for reliably moving large amounts of data. Apologies that this is not currently clearer in the docs. I'm currently working on a docs revision which makes that more apparent.

@cisaacstern, thanks for the reply.

In the version of the code in my initial description, I've pruned the pattern to only 1 file:

# Prune to 1 element to minimize memory reqs for now
pattern = pattern_from_file_sequence(urls, "time", nitems_per_file=1).prune(1)

This is confirmed by the initial line of output:

pattern=<FilePattern {'time': 1}>

While each file is rather larger (~1.5G), I wouldn't expect operating on only 1 file to consume such an incredible amount of memory (40G+, perhaps even far more, while I wasn't looking).

Regarding your question about how large I expect the final zarr file to be, I'm not particularly familiar with the format to know how the size of the input files (1 file in this case) would translate into the size of the final zarr file.

However, I wouldn't expect processing a single 1.5G file to cause things to collapse even on a local runner, but I don't know what's happening under the covers.

I specifically pruned the pattern to 1 file (I originally used 2, which caused my original memory issue, so I pruned to only 1 to see if that would help, to no avail) to limit memory consumption as much as possible, just to confirm that I could produce sensible output before potentially submitting the recipe to the staged-recipes repo (if that's even the correct course of action), where I imagine it would be executed on a sufficiently large platform to process all of the files.

Do you have any further insights, or any suggestions?

I have reproduced this issue. Now investigating.

This is the problem

        target_chunks={"time": 1, "lat": 5, "lon": 5},

You are trying to decimate this very large dataset into a tiny, tiny number of chunks. This is overwhelming beam with a vast number of tasks.

The original dataset dimensions are {'time': 1, 'lat': 18000, 'lon': 36000, 'length_scale': 1, 'channel': 2}. So this pipeline is creating 25920000 different tasks (one for each chunk).

Beyond the inefficient pipeline, this Zarr with these tiny chunks would be extremely hard to use. We generally aim for chunks from 1-100 MB. The following chunks seemed to work for me.

        target_chunks={"lat": 1800, "lon": 1800},

However, then I hit a new error

IndexError: too many indices for array; expected 1, got 4

which I swear I have seen before but can't remember where.

/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]
/srv/conda/envs/notebook/lib/python3.10/site-packages/pandas/core/arrays/timedeltas.py:908: RuntimeWarning: invalid value encountered in cast
  base = data.astype(np.int64)
/srv/conda/envs/notebook/lib/python3.10/site-packages/pandas/core/arrays/timedeltas.py:912: RuntimeWarning: invalid value encountered in cast
  data = (base * m + (frac * m).astype(np.int64)).view("timedelta64[ns]")
/srv/conda/envs/notebook/lib/python3.10/site-packages/xarray/coding/times.py:618: RuntimeWarning: invalid value encountered in cast
  int_num = np.asarray(num, dtype=np.int64)
Unexpected exception formatting exception. Falling back to standard exception
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 839, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 985, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/transforms/core.py", line -1, in <lambda>
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 90, in store_dataset_fragment
    _store_data(vname, da.variable, index, zgroup)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 50, in _store_data
    zarr_array[region] = data
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1497, in __setitem__
    self.set_basic_selection(pure_selection, value, fields=fields)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1593, in set_basic_selection
    return self._set_basic_selection_nd(selection, value, fields=fields)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1981, in _set_basic_selection_nd
    indexer = BasicIndexer(selection, self)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 335, in __init__
    selection = replace_ellipsis(selection, array._shape)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 272, in replace_ellipsis
    check_selection_length(selection, shape)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 236, in check_selection_length
    err_too_many_indices(selection, shape)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/errors.py", line 70, in err_too_many_indices
    raise IndexError(
IndexError: too many indices for array; expected 1, got 4

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 3508, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "/tmp/ipykernel_1951/4238254113.py", line 1, in <module>
    with beam.Pipeline() as p:
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/pipeline.py", line 600, in __exit__
    self.result = self.run()
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/pipeline.py", line 577, in run
    return self.runner.run_pipeline(self, self._options)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/direct/direct_runner.py", line 128, in run_pipeline
    return runner.run_pipeline(pipeline, options)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 202, in run_pipeline
    self._latest_run_result = self.run_via_runner_api(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 224, in run_via_runner_api
    return self.run_stages(stage_context, stages)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 455, in run_stages
    bundle_results = self._execute_bundle(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 783, in _execute_bundle
    self._run_bundle(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1020, in _run_bundle
    result, splits = bundle_manager.process_bundle(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py", line 1356, in process_bundle
    result_future = self._worker_handler.control_conn.push(process_bundle_req)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py", line 379, in push
    response = self.worker.do_instruction(request)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 625, in do_instruction
    return getattr(self, request_type)(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 663, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1051, in process_bundle
    input_op_by_transform_id[element.transform_id].process_encoded(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 232, in process_encoded
    self.output(decoded_value)
  File "apache_beam/runners/worker/operations.py", line 568, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 570, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 261, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1513, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 625, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "apache_beam/runners/common.py", line 1607, in apache_beam.runners.common._OutputHandler.handle_process_outputs
  File "apache_beam/runners/common.py", line 1720, in apache_beam.runners.common._OutputHandler._write_value_to_tag
  File "apache_beam/runners/worker/operations.py", line 264, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 951, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 952, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1425, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1533, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "apache_beam/runners/common.py", line 1423, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 839, in apache_beam.runners.common.PerWindowInvoker.invoke_process
  File "apache_beam/runners/common.py", line 985, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/apache_beam/transforms/core.py", line -1, in <lambda>
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 90, in store_dataset_fragment
    _store_data(vname, da.variable, index, zgroup)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/pangeo_forge_recipes/writers.py", line 50, in _store_data
    zarr_array[region] = data
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1497, in __setitem__
    self.set_basic_selection(pure_selection, value, fields=fields)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1593, in set_basic_selection
    return self._set_basic_selection_nd(selection, value, fields=fields)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/core.py", line 1981, in _set_basic_selection_nd
    indexer = BasicIndexer(selection, self)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 335, in __init__
    selection = replace_ellipsis(selection, array._shape)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 272, in replace_ellipsis
    check_selection_length(selection, shape)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/indexing.py", line 236, in check_selection_length
    err_too_many_indices(selection, shape)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/zarr/errors.py", line 70, in err_too_many_indices
    raise IndexError(
IndexError: too many indices for array; expected 1, got 4 [while running '[23]: Create|OpenURLWithFSSpec|OpenWithXarray|StoreToZarr/StoreToZarr/StoreDatasetFragments/Map(store_dataset_fragment)']

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/interactiveshell.py", line 2105, in showtraceback
    stb = self.InteractiveTB.structured_traceback(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1428, in structured_traceback
    return FormattedTB.structured_traceback(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1319, in structured_traceback
    return VerboseTB.structured_traceback(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1172, in structured_traceback
    formatted_exception = self.format_exception_as_a_whole(etype, evalue, etb, number_of_lines_of_context,
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 1087, in format_exception_as_a_whole
    frames.append(self.format_record(record))
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 969, in format_record
    frame_info.lines, Colors, self.has_colors, lvals
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/IPython/core/ultratb.py", line 792, in lines
    return self._sd.lines
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/utils.py", line 144, in cached_property_wrapper
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/core.py", line 734, in lines
    pieces = self.included_pieces
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/utils.py", line 144, in cached_property_wrapper
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/core.py", line 681, in included_pieces
    pos = scope_pieces.index(self.executing_piece)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/utils.py", line 144, in cached_property_wrapper
    value = obj.__dict__[self.func.__name__] = self.func(obj)
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/stack_data/core.py", line 660, in executing_piece
    return only(
  File "/srv/conda/envs/notebook/lib/python3.10/site-packages/executing/executing.py", line 190, in only
    raise NotOneValueFound('Expected one value, found 0')
executing.executing.NotOneValueFound: Expected one value, found 0

Ok, I got this pipeline working. Annotated changes below.

# set up a cache location
cache_dir = TemporaryDirectory()
cache_root = cache_dir.name

transforms = (
    beam.Create(pattern.items())
    # Caching the file means that we don't have to load each of the 200 chunks over the network
    # (which in this case is a transatlantic hop)
    # Instead we cache it to local file storage
    # (or could use an in-region object store for a distributed pipeline)
    | OpenURLWithFSSpec(cache=cache_root)
    | OpenWithXarray(
        file_type=pattern.file_type,
        # This variable is problematic because it doesn't have the time dimension,
        # so Pangeo Forge gets confused about how to deal with it.
        # The easiest thing is to drop it. Could also promote it to a coordinate variable.
        xarray_open_kwargs={"drop_variables": ["lst_unc_sys"]},
    )
    | StoreToZarr(
        target_root=target_root,
        store_name=store_name,
        combine_dims=pattern.combine_dim_keys,
        # approx 25 MB chunks
        target_chunks={"lat": 1800, "lon": 1800},
    )
)

This works. It's not particularly fast in this configuration, but it should parallelize well.

Fantastic! Thank you!

Yes, at one point I had also run into that IndexError you mentioned. I don't recall how.

I had also tried larger values for lat and lon chunks, but I ran into buffer overflow errors. However, I was also including a time chunk of 1. I hadn't tried dropping it because I had read somewhere in the docs that the library would otherwise assume enough available memory to hold all files being processed.

Are we able to drop the time chunk because time is the concat dim?

Also, how did you determine that your proposed target chunks are ~25MB in size (I'm still a noob at all of this)?

Are we able to drop the time chunk because time is the concat dim?

Correct.

Also, how did you determine that your proposed target chunks are ~25MB in size (I'm still a noob at all of this)?

1800 * 1800 * 8 bytes = 25.9 MB

FWIW, I would reduce the precision of this data to float 32. You almost never need 64 bits of precision for data analytics.

Everything is float32, apart from the time values, which are datetime64[ns]. Did you use 8 bytes in the chunk size computation because you were thinking the data are float64, not float32?

I'm going to try different chunk settings (to get me nearer to ~100MB -- isn't that the "magic" number, give or take?), particularly since I can't seem to get your code suggestion to work. I keep getting either aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed or fsspec.exceptions.FSTimeoutError. Perhaps fiddling with the chunk size will get me past them.

Everything is float32, apart from the time values, which are datetime64[ns]. Did you use 8 bytes in the chunk size computation because you were thinking the data are float64, not float32?

Ok I see, yes this is definitely the case for the original file. For my recipe however, the target dataset was all float64. That's not good. ๐Ÿ˜•

I keep getting either aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed or fsspec.exceptions.FSTimeoutError. Perhaps fiddling with the chunk size will get me past them.

Do you know if this timeout is coming from the upstream CEDA server?

Everything is float32, apart from the time values, which are datetime64[ns]. Did you use 8 bytes in the chunk size computation because you were thinking the data are float64, not float32?

Ok I see, yes this is definitely the case for the original file. For my recipe however, the target dataset was all float64. That's not good. ๐Ÿ˜•

I finally got a successful run. I used target_chunks={"lat": 3600, "lon": 7200} and it took ~30 min. to complete.

I now see that the result is float64 that you mentioned, even though the original is float32. Is there any way for us to prevent this unwanted conversion?

I keep getting either aiohttp.client_exceptions.ClientPayloadError: Response payload is not completed or fsspec.exceptions.FSTimeoutError. Perhaps fiddling with the chunk size will get me past them.

Do you know if this timeout is coming from the upstream CEDA server?

I suspect the problem may be on the CEDA server side. While using the CEDA OPeNDAP server during the course of some experimental work (which is what led me to look at writing this recipe, on the advice of @sharkinsspatial), we've experienced a great deal of flakiness.

I also see a few dozen of these 2 warnings during execution of the recipe:

.../python3.11/site-packages/xarray/core/dataset.py:2461: SerializationWarning: saving variable None with floating point data as an integer dtype without any _FillValue to use for NaNs
  return to_zarr(  # type: ignore[call-overload,misc]

and

.../python3.11/site-packages/xarray/coding/times.py:618: RuntimeWarning: invalid value encountered in cast
  int_num = np.asarray(num, dtype=np.int64)

Is there any way (or need) to address these?

Here's the current, successfully running code (running locally, pruned to 2 files), when the server isn't flaking out. I also managed to get a successful local run pruned to 10 files (just barely, as I saw memory spike to over 70G!), which led me to discover that there are a couple of gaps in the monthly files, so the date range is adjusted for those gaps.

import os
from pathlib import Path
from tempfile import TemporaryDirectory

import apache_beam as beam
import pandas as pd
import xarray as xr
from pangeo_forge_recipes.patterns import pattern_from_file_sequence
from pangeo_forge_recipes.transforms import (
    OpenURLWithFSSpec,
    OpenWithXarray,
    StoreToZarr,
)

# Monthly data spans from 1995-08 through 2020-12, with the exception of two
# gaps: (1) 1996-01 through 1996-06 and (2) 2001-02 through 2001-06.
months = (
    pd.date_range("1995-08", "2020-12", freq=pd.offsets.MonthBegin())
    .difference(pd.date_range("1996-01", "1996-06", freq=pd.offsets.MonthBegin()))
    .difference(pd.date_range("2001-02", "2001-06", freq=pd.offsets.MonthBegin()))
)

url_pattern = (
    "https://dap.ceda.ac.uk/neodc/esacci/land_surface_temperature/data/"
    "MULTISENSOR_IRCDR/L3S/0.01/v2.00/monthly/{time:%Y}/{time:%m}/"
    "ESACCI-LST-L3S-LST-IRCDR_-0.01deg_1MONTHLY_DAY-{time:%Y%m}01000000-fv2.00.nc"
    "#mode=bytes"
)
urls = tuple(url_pattern.format(time=month) for month in months)
pattern = pattern_from_file_sequence(urls, "time").prune(2)

target_root = str(Path().absolute())
store_name = "output.zarr"
target_store = os.path.join(target_root, store_name)

cache_dir = TemporaryDirectory()
cache_root = cache_dir.name

transforms = (
    beam.Create(pattern.items())
    | OpenURLWithFSSpec(cache=cache_root)
    | OpenWithXarray(
        file_type=pattern.file_type,
        # The lst_unc_sys var doesn't have the time dimension, which causes a
        # "chunk size mismatch" error, so we must drop it.
        xarray_open_kwargs={"drop_variables": ["lst_unc_sys"], "chunks": "auto"},
    )
    | StoreToZarr(
        target_root=target_root,
        store_name=store_name,
        combine_dims=pattern.combine_dim_keys,
        target_chunks={"lat": 1800, "lon": 3600},
    )
)

with beam.Pipeline() as p:
    p | transforms  # type: ignore[reportUnusedExpression]

with xr.open_zarr(target_store) as ds:
    print(ds)

Now my question is, what do I need to do from here to make a good submission for the staged recipes repo?

In particular, I'm wondering about the following things (and I suspect there are other considerations that I'm not aware of):

  1. How should I specify the target store location? I suspect this should be a cloud location, no?
  2. Same question for the cache directory.
  3. Is there any way to eliminate the warnings mentioned in my previous comment? Should I even bother?
  4. Is there any way to avoid the conversion from float32 to float64 mentioned in an earlier comment?

@chuckwondo, way to push this to a working state! Responses inline:

Now my question is, what do I need to do from here to make a good submission for the staged recipes repo?

The deployment of recipes from staged-recipes is not currently maintained. Do you have access to a GCP Dataflow and/or AWS account to deploy this to? The following is a current example of using our GitHub Action to deploy to GCP Dataflow (but could be adapted for AWS): https://github.com/pangeo-forge/aqua-modis-feedstock/blob/8cb545d20d4c08e49ec176a27da1dc48d4c2191d/.github/workflows/deploy.yaml#L31-L69

Since it appears you're at DevSeed, perhaps coordinating with @sharkinsspatial on deployment infra makes sense. Sean, where have you been deploying to lately?

How should I specify the target store location? I suspect this should be a cloud location, no?
Same question for the cache directory.

As indicated in the GitHub Workflow example above, we recommend configuring this (and deploying) with the pangeo-forge-runner CLI. The docs are lagging behind on this subject, and are currently my top priority, apologies for the lack of clarity here. One of the better reference points I can point to today is the integration tests on pangeo-forge-recipes which (though a bit opaque, due to fixturing), do represent a fully tested use of the CLI to deploy to a local bakery:

def test_integration(recipe_id: str, confpath: str):
.

I will aim to have better docs on this within a week or so, and will link them to you here ASAP.

Is there any way to eliminate the warnings mentioned in my previous comment? Should I even bother?

TBH, I'm not entirely clear what these mean so I'd ignore for now, assuming it doesn't appear to be affecting your output.

Is there any way to avoid the conversion from float32 to float64 mentioned in an earlier comment?

I'm not sure how to do this, if you're able to find a solution yourself, I'd be curious to know! Alternatively, could you open a dedicated Issue for this?