google/xarray-beam

Help with opening netcdf4 files via HTTP

rabernat opened this issue ยท 19 comments

This is not necessarily an xarray-beam specific question; however it relates to issues here (e.g. #37, #32) as well as in Pangeo Forge. So I am asking it here. I hope people here will be able to help me. Ultimately I hope this will help use resolve pangeo-forge/pangeo-forge-recipes#373 and move forward with merging Pangeo Forge and xarray-beam.

Goal: open xarray datasets from HTTP endpoints lazily and pass them around a beam pipeline. Delay loading of data variable until later in the pipeline.

What I have tried

Here is the basic pipeline I am working with. It is a simplified, self-contained version of what we will want to do in Pangeo Forge. (Note: this probably requires installing the latest version of fsspec from master, in order to get fsspec/filesystem_spec#973.)

import xarray as xr
import fsspec

import apache_beam as beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import BeamAssertException, assert_that


def open_http_url_with_fsspec(url):
    return fsspec.open(url, mode='rb')


def open_fsspec_openfile_with_xarray(of):
    with of as fp:
        with xr.open_dataset(fp, engine='h5netcdf') as ds:
            return ds


def load_xarray_ds(ds):
    return ds.load()


def is_xr_dataset():
    def _is_xr_dataset(actual):
        for ds in actual:
            if not isinstance(ds, xr.Dataset):
                raise BeamAssertException(f"Object {ds} has type {type(ds)}, expected xr.Dataset.")
            if not ds.AerosolOpticalThickness.variable._in_memory:
                raise BeamAssertException(f"Variable not in memory")

    return _is_xr_dataset


URL = 'https://www.unidata.ucar.edu/software/netcdf/examples/OMI-Aura_L2-example.nc'


with TestPipeline() as p:
    inputs = p | beam.Create([URL])
    open_files = inputs | beam.Map(open_http_url_with_fsspec)
    dsets = open_files | beam.Map(open_fsspec_openfile_with_xarray)
    loaded_dsets = dsets | beam.Map(load_xarray_ds)
    
    assert_that(loaded_dsets, is_xr_dataset())

When I run this I get ValueError: I/O operation on closed file. [while running '[1]: Map(load_xarray_ds)']

Full Traceback
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.9 interpreter.
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
~/Code/xarray/xarray/backends/file_manager.py in _acquire_with_cache_info(self, needs_lock)
    198             try:
--> 199                 file = self._cache[self._key]
    200             except KeyError:

~/Code/xarray/xarray/backends/lru_cache.py in __getitem__(self, key)
     52         with self._lock:
---> 53             value = self._cache[key]
     54             self._cache.move_to_end(key)

KeyError: [<class 'h5netcdf.core.File'>, (<File-like object HTTPFileSystem, https://www.unidata.ucar.edu/software/netcdf/examples/OMI-Aura_L2-example.nc>,), 'r', (('decode_vlen_strings', True), ('invalid_netcdf', None))]

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/transforms/core.py in <lambda>(x)
   1638   else:
-> 1639     wrapper = lambda x: [fn(x)]
   1640 

<ipython-input-1-53601ea18ea3> in load_xarray_ds(ds)
     19 def load_xarray_ds(ds):
---> 20     return ds.load()
     21 

~/Code/xarray/xarray/core/dataset.py in load(self, **kwargs)
    688             if k not in lazy_data:
--> 689                 v.load()
    690 

~/Code/xarray/xarray/core/variable.py in load(self, **kwargs)
    443         elif not is_duck_array(self._data):
--> 444             self._data = np.asarray(self._data)
    445         return self

~/Code/xarray/xarray/core/indexing.py in __array__(self, dtype)
    655     def __array__(self, dtype=None):
--> 656         self._ensure_cached()
    657         return np.asarray(self.array, dtype=dtype)

~/Code/xarray/xarray/core/indexing.py in _ensure_cached(self)
    652         if not isinstance(self.array, NumpyIndexingAdapter):
--> 653             self.array = NumpyIndexingAdapter(np.asarray(self.array))
    654 

~/Code/xarray/xarray/core/indexing.py in __array__(self, dtype)
    625     def __array__(self, dtype=None):
--> 626         return np.asarray(self.array, dtype=dtype)
    627 

~/Code/xarray/xarray/core/indexing.py in __array__(self, dtype)
    526         array = as_indexable(self.array)
--> 527         return np.asarray(array[self.key], dtype=None)
    528 

~/Code/xarray/xarray/backends/h5netcdf_.py in __getitem__(self, key)
     50     def __getitem__(self, key):
---> 51         return indexing.explicit_indexing_adapter(
     52             key, self.shape, indexing.IndexingSupport.OUTER_1VECTOR, self._getitem

~/Code/xarray/xarray/core/indexing.py in explicit_indexing_adapter(key, shape, indexing_support, raw_indexing_method)
    815     raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
--> 816     result = raw_indexing_method(raw_key.tuple)
    817     if numpy_indices.tuple:

~/Code/xarray/xarray/backends/h5netcdf_.py in _getitem(self, key)
     59         with self.datastore.lock:
---> 60             array = self.get_array(needs_lock=False)
     61             return array[key]

~/Code/xarray/xarray/backends/h5netcdf_.py in get_array(self, needs_lock)
     46     def get_array(self, needs_lock=True):
---> 47         ds = self.datastore._acquire(needs_lock)
     48         return ds.variables[self.variable_name]

~/Code/xarray/xarray/backends/h5netcdf_.py in _acquire(self, needs_lock)
    181     def _acquire(self, needs_lock=True):
--> 182         with self._manager.acquire_context(needs_lock) as root:
    183             ds = _nc4_require_group(

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/contextlib.py in __enter__(self)
    118         try:
--> 119             return next(self.gen)
    120         except StopIteration:

~/Code/xarray/xarray/backends/file_manager.py in acquire_context(self, needs_lock)
    186         """Context manager for acquiring a file."""
--> 187         file, cached = self._acquire_with_cache_info(needs_lock)
    188         try:

~/Code/xarray/xarray/backends/file_manager.py in _acquire_with_cache_info(self, needs_lock)
    204                     kwargs["mode"] = self._mode
--> 205                 file = self._opener(*self._args, **kwargs)
    206                 if self._mode == "w":

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/h5netcdf/core.py in __init__(self, path, mode, invalid_netcdf, phony_dims, **kwargs)
    720                     self._preexisting_file = mode in {"r", "r+", "a"}
--> 721                     self._h5file = h5py.File(path, mode, **kwargs)
    722         except Exception:

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/h5py/_hl/files.py in __init__(self, name, mode, driver, libver, userblock_size, swmr, rdcc_nslots, rdcc_nbytes, rdcc_w0, track_order, fs_strategy, fs_persist, fs_threshold, fs_page_size, page_buf_size, min_meta_keep, min_raw_keep, locking, **kwds)
    506                                  fs_page_size=fs_page_size)
--> 507                 fid = make_fid(name, mode, userblock_size, fapl, fcpl, swmr=swmr)
    508 

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/h5py/_hl/files.py in make_fid(name, mode, userblock_size, fapl, fcpl, swmr)
    219             flags |= h5f.ACC_SWMR_READ
--> 220         fid = h5f.open(name, flags, fapl=fapl)
    221     elif mode == 'r+':

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/h5f.pyx in h5py.h5f.open()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

~/Code/filesystem_spec/fsspec/spec.py in readinto(self, b)
   1587         out = memoryview(b).cast("B")
-> 1588         data = self.read(out.nbytes)
   1589         out[: len(data)] = data

~/Code/filesystem_spec/fsspec/implementations/http.py in read(self, length)
    573             length = min(self.size - self.loc, length)
--> 574         return super().read(length)
    575 

~/Code/filesystem_spec/fsspec/spec.py in read(self, length)
   1572         if self.closed:
-> 1573             raise ValueError("I/O operation on closed file.")
   1574         logger.debug("%s read: %i - %i" % (self, self.loc, self.loc + length))

ValueError: I/O operation on closed file.

During handling of the above exception, another exception occurred:

ValueError                                Traceback (most recent call last)
<ipython-input-1-53601ea18ea3> in <module>
     41     loaded_dsets = dsets | beam.Map(load_xarray_ds)
     42 
---> 43     assert_that(loaded_dsets, is_xr_dataset())

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/pipeline.py in __exit__(self, exc_type, exc_val, exc_tb)
    594     try:
    595       if not exc_type:
--> 596         self.result = self.run()
    597         self.result.wait_until_finish()
    598     finally:

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/testing/test_pipeline.py in run(self, test_runner_api)
    110 
    111   def run(self, test_runner_api=True):
--> 112     result = super().run(
    113         test_runner_api=(
    114             False if self.not_use_test_runner_api else test_runner_api))

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    544       # When possible, invoke a round trip through the runner API.
    545       if test_runner_api and self._verify_runner_api_compatible():
--> 546         return Pipeline.from_runner_api(
    547             self.to_runner_api(use_fake_coders=True),
    548             self.runner,

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/pipeline.py in run(self, test_runner_api)
    571         finally:
    572           shutil.rmtree(tmpdir)
--> 573       return self.runner.run_pipeline(self, self._options)
    574     finally:
    575       if not is_in_ipython():

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/direct/direct_runner.py in run_pipeline(self, pipeline, options)
    129       runner = BundleBasedDirectRunner()
    130 
--> 131     return runner.run_pipeline(pipeline, options)
    132 
    133 

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_pipeline(self, pipeline, options)
    197         options.view_as(pipeline_options.ProfilingOptions))
    198 
--> 199     self._latest_run_result = self.run_via_runner_api(
    200         pipeline.to_runner_api(default_environment=self._default_environment))
    201     return self._latest_run_result

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_via_runner_api(self, pipeline_proto)
    208     # TODO(pabloem, BEAM-7514): Create a watermark manager (that has access to
    209     #   the teststream (if any), and all the stages).
--> 210     return self.run_stages(stage_context, stages)
    211 
    212   @contextlib.contextmanager

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in run_stages(self, stage_context, stages)
    393           )
    394 
--> 395           stage_results = self._run_stage(
    396               runner_execution_context, bundle_context_manager)
    397 

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_stage(self, runner_execution_context, bundle_context_manager)
    658     while True:
    659       last_result, deferred_inputs, fired_timers, watermark_updates = (
--> 660           self._run_bundle(
    661               runner_execution_context,
    662               bundle_context_manager,

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in _run_bundle(self, runner_execution_context, bundle_context_manager, data_input, data_output, input_timers, expected_timer_output, bundle_manager)
    781         expected_timer_output)
    782 
--> 783     result, splits = bundle_manager.process_bundle(
    784         data_input, data_output, input_timers, expected_timer_output)
    785     # Now we collect all the deferred inputs remaining from bundle execution.

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/fn_runner.py in process_bundle(self, inputs, expected_outputs, fired_timers, expected_output_timers, dry_run)
   1092             process_bundle_descriptor.id,
   1093             cache_tokens=[next(self._cache_token_generator)]))
-> 1094     result_future = self._worker_handler.control_conn.push(process_bundle_req)
   1095 
   1096     split_results = []  # type: List[beam_fn_api_pb2.ProcessBundleSplitResponse]

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/portability/fn_api_runner/worker_handlers.py in push(self, request)
    376       self._uid_counter += 1
    377       request.instruction_id = 'control_%s' % self._uid_counter
--> 378     response = self.worker.do_instruction(request)
    379     return ControlFuture(request.instruction_id, response)
    380 

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py in do_instruction(self, request)
    578     if request_type:
    579       # E.g. if register is set, this will call self.register(request.register))
--> 580       return getattr(self, request_type)(
    581           getattr(request, request_type), request.instruction_id)
    582     else:

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/sdk_worker.py in process_bundle(self, request, instruction_id)
    616         with self.maybe_profile(instruction_id):
    617           delayed_applications, requests_finalization = (
--> 618               bundle_processor.process_bundle(instruction_id))
    619           monitoring_infos = bundle_processor.monitoring_infos()
    620           monitoring_infos.extend(self.state_cache_metrics_fn())

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py in process_bundle(self, instruction_id)
    993                   element.timer_family_id, timer_data)
    994           elif isinstance(element, beam_fn_api_pb2.Elements.Data):
--> 995             input_op_by_transform_id[element.transform_id].process_encoded(
    996                 element.data)
    997 

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py in process_encoded(self, encoded_windowed_values)
    219       decoded_value = self.windowed_coder_impl.decode_from_stream(
    220           input_stream, True)
--> 221       self.output(decoded_value)
    222 
    223   def monitoring_infos(self, transform_id, tag_to_pcollection_id):

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.Operation.output()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.Operation.output()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common._OutputProcessor.process_outputs()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.SingletonConsumerSet.receive()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/worker/operations.cpython-39-darwin.so in apache_beam.runners.worker.operations.DoOperation.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner._reraise_augmented()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.DoFnRunner.process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/runners/common.cpython-39-darwin.so in apache_beam.runners.common.SimpleInvoker.invoke_process()

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/apache_beam/transforms/core.py in <lambda>(x)
   1637     wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
   1638   else:
-> 1639     wrapper = lambda x: [fn(x)]
   1640 
   1641   label = 'Map(%s)' % ptransform.label_from_callable(fn)

<ipython-input-1-53601ea18ea3> in load_xarray_ds(ds)
     18 
     19 def load_xarray_ds(ds):
---> 20     return ds.load()
     21 
     22 

~/Code/xarray/xarray/core/dataset.py in load(self, **kwargs)
    687         for k, v in self.variables.items():
    688             if k not in lazy_data:
--> 689                 v.load()
    690 
    691         return self

~/Code/xarray/xarray/core/variable.py in load(self, **kwargs)
    442             self._data = as_compatible_data(self._data.compute(**kwargs))
    443         elif not is_duck_array(self._data):
--> 444             self._data = np.asarray(self._data)
    445         return self
    446 

~/Code/xarray/xarray/core/indexing.py in __array__(self, dtype)
    654 
    655     def __array__(self, dtype=None):
--> 656         self._ensure_cached()
    657         return np.asarray(self.array, dtype=dtype)
    658 

~/Code/xarray/xarray/core/indexing.py in _ensure_cached(self)
    651     def _ensure_cached(self):
    652         if not isinstance(self.array, NumpyIndexingAdapter):
--> 653             self.array = NumpyIndexingAdapter(np.asarray(self.array))
    654 
    655     def __array__(self, dtype=None):

~/Code/xarray/xarray/core/indexing.py in __array__(self, dtype)
    624 
    625     def __array__(self, dtype=None):
--> 626         return np.asarray(self.array, dtype=dtype)
    627 
    628     def __getitem__(self, key):

~/Code/xarray/xarray/core/indexing.py in __array__(self, dtype)
    525     def __array__(self, dtype=None):
    526         array = as_indexable(self.array)
--> 527         return np.asarray(array[self.key], dtype=None)
    528 
    529     def transpose(self, order):

~/Code/xarray/xarray/backends/h5netcdf_.py in __getitem__(self, key)
     49 
     50     def __getitem__(self, key):
---> 51         return indexing.explicit_indexing_adapter(
     52             key, self.shape, indexing.IndexingSupport.OUTER_1VECTOR, self._getitem
     53         )

~/Code/xarray/xarray/core/indexing.py in explicit_indexing_adapter(key, shape, indexing_support, raw_indexing_method)
    814     """
    815     raw_key, numpy_indices = decompose_indexer(key, shape, indexing_support)
--> 816     result = raw_indexing_method(raw_key.tuple)
    817     if numpy_indices.tuple:
    818         # index the loaded np.ndarray

~/Code/xarray/xarray/backends/h5netcdf_.py in _getitem(self, key)
     58         key = tuple(list(k) if isinstance(k, np.ndarray) else k for k in key)
     59         with self.datastore.lock:
---> 60             array = self.get_array(needs_lock=False)
     61             return array[key]
     62 

~/Code/xarray/xarray/backends/h5netcdf_.py in get_array(self, needs_lock)
     45 class H5NetCDFArrayWrapper(BaseNetCDF4Array):
     46     def get_array(self, needs_lock=True):
---> 47         ds = self.datastore._acquire(needs_lock)
     48         return ds.variables[self.variable_name]
     49 

~/Code/xarray/xarray/backends/h5netcdf_.py in _acquire(self, needs_lock)
    180 
    181     def _acquire(self, needs_lock=True):
--> 182         with self._manager.acquire_context(needs_lock) as root:
    183             ds = _nc4_require_group(
    184                 root, self._group, self._mode, create_group=_h5netcdf_create_group

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/contextlib.py in __enter__(self)
    117         del self.args, self.kwds, self.func
    118         try:
--> 119             return next(self.gen)
    120         except StopIteration:
    121             raise RuntimeError("generator didn't yield") from None

~/Code/xarray/xarray/backends/file_manager.py in acquire_context(self, needs_lock)
    185     def acquire_context(self, needs_lock=True):
    186         """Context manager for acquiring a file."""
--> 187         file, cached = self._acquire_with_cache_info(needs_lock)
    188         try:
    189             yield file

~/Code/xarray/xarray/backends/file_manager.py in _acquire_with_cache_info(self, needs_lock)
    203                     kwargs = kwargs.copy()
    204                     kwargs["mode"] = self._mode
--> 205                 file = self._opener(*self._args, **kwargs)
    206                 if self._mode == "w":
    207                     # ensure file doesn't get overridden when opened again

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/h5netcdf/core.py in __init__(self, path, mode, invalid_netcdf, phony_dims, **kwargs)
    719                 else:
    720                     self._preexisting_file = mode in {"r", "r+", "a"}
--> 721                     self._h5file = h5py.File(path, mode, **kwargs)
    722         except Exception:
    723             self._closed = True

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/h5py/_hl/files.py in __init__(self, name, mode, driver, libver, userblock_size, swmr, rdcc_nslots, rdcc_nbytes, rdcc_w0, track_order, fs_strategy, fs_persist, fs_threshold, fs_page_size, page_buf_size, min_meta_keep, min_raw_keep, locking, **kwds)
    505                                  fs_persist=fs_persist, fs_threshold=fs_threshold,
    506                                  fs_page_size=fs_page_size)
--> 507                 fid = make_fid(name, mode, userblock_size, fapl, fcpl, swmr=swmr)
    508 
    509             if isinstance(libver, tuple):

/opt/miniconda3/envs/pangeo-forge-recipes/lib/python3.9/site-packages/h5py/_hl/files.py in make_fid(name, mode, userblock_size, fapl, fcpl, swmr)
    218         if swmr and swmr_support:
    219             flags |= h5f.ACC_SWMR_READ
--> 220         fid = h5f.open(name, flags, fapl=fapl)
    221     elif mode == 'r+':
    222         fid = h5f.open(name, h5f.ACC_RDWR, fapl=fapl)

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/_objects.pyx in h5py._objects.with_phil.wrapper()

h5py/h5f.pyx in h5py.h5f.open()

h5py/h5fd.pyx in h5py.h5fd.H5FD_fileobj_read()

~/Code/filesystem_spec/fsspec/spec.py in readinto(self, b)
   1586         """
   1587         out = memoryview(b).cast("B")
-> 1588         data = self.read(out.nbytes)
   1589         out[: len(data)] = data
   1590         return len(data)

~/Code/filesystem_spec/fsspec/implementations/http.py in read(self, length)
    572         else:
    573             length = min(self.size - self.loc, length)
--> 574         return super().read(length)
    575 
    576     async def async_fetch_all(self):

~/Code/filesystem_spec/fsspec/spec.py in read(self, length)
   1571             length = self.size - self.loc
   1572         if self.closed:
-> 1573             raise ValueError("I/O operation on closed file.")
   1574         logger.debug("%s read: %i - %i" % (self, self.loc, self.loc + length))
   1575         if length == 0:

ValueError: I/O operation on closed file. [while running '[1]: Map(load_xarray_ds)']

This is not quite the same error as I am getting in pangeo-forge/pangeo-forge-recipes#373; there it it instead OSError: Unable to open file (incorrect metadata checksum after all read attempts). I have not been able to reproduce that error outside of pytest. However, my example here fails at the same point: when calling ds.load() on an h5netcdf-backed xarray dataset pointing at an fsspec HTTPFile object.

What is wrong

Overall my concern is with this pattern:

def open_fsspec_openfile_with_xarray(of):
    with of as fp:
        return xr.open_dataset(fp, engine='h5netcdf') as ds:
            return ds

It feels wrong. I should either be yeilding or else not using context managers. The first context manager is necessary. The second may be optional. But overall my understanding is that the outputs of the Map function need to be pickled, in which case the contextmanager pattern doesn't make sense at all. I have tried various other flavors, like

def open_fsspec_openfile_with_xarray(of):
    with of as fp:
        with xr.open_dataset(fp, engine='h5netcdf') as ds:
            pass
    return ds

or

def open_fsspec_openfile_with_xarray(of):
    with of as fp:
        ds =  xr.open_dataset(fp, engine='h5netcdf')
    return ds

but nothing seems to work. The fundamental issue seems to be simply this

with fsspec.open(URL) as fp:
    ds = xr.open_dataset(fp, engine='h5netcdf')
ds.load()  # -> ValueError: I/O operation on closed file.

Has anyone here managed to make something like this work? I feel like I'm missing something obvious.

cc @martindurant

One other weird datapoint that I have discovered is that I can pickle the un-loadable dataset, load it from the pickle, and then call .load() and it works!

url = 'https://www.unidata.ucar.edu/software/netcdf/examples/OMI-Aura_L2-example.nc' # netcdf

open_file = fsspec.open(url, mode='rb')
with open_file as fp:
    with xr.open_dataset(fp, engine='h5netcdf') as ds:
        pass

ds_pk = pickle.dumps(ds)
ds1 = pickle.loads(ds_pk)
ds1.load()

This suggests that the dataset is serializable in some way. But something is happening in beam that is preventing that path from being taken.

Ok the mystery deepens even further. After running the code just above, you can then call ds.load() on the original dataset, and it works. This suggests that the two datasets are somehow sharing some state, perhaps via Xarray's CachingFileManager. @shoyer does that seem plausible?

I'm enjoying the conversation with myself, so I'll just keep going... ๐Ÿ™ƒ

I figured out a bizarre workaround that makes it work

def open_fsspec_openfile_with_xarray(of):
    with of as fp:
        with xr.open_dataset(fp, engine='h5netcdf') as ds:
            pass
    ds1 = pickle.loads(pickle.dumps(ds))
    return ds1

Wait, what!?

I would have thought that pickle.dumps() is exactly what was being done anyway, so roundtripping with pickle a second time really shouldn't be making any difference. Can you see anything difference between the __dict__ of ds and ds1 or their binary pickled representation? What about if you pickle with protocol=1?

By the way, copy.deepcopy should do about the same thing, without first making the bytestring.

I have not tried to introspect the pickle objects. I don't know much about pickle details and internals. #49 (comment) is a fully isolated, copy-pasteable reproducer for this, so if you wanted to dig in, that would be the place to start.

My best guess is that there is an is_closed parameter somewhere deep in the h5py.File object, and that pickling / unpickling triggers this to be reset. For http, it doesn't really mean anything for a file to be "open", so this notion of open / closed files can only cause problem.

This could also be interacting with Xarray in some unexpected ways. See pydata/xarray#4242 for some discussion of that.

I got some weird conversion error

~/conda/envs/py38/lib/python3.8/site-packages/xarray/conventions.py in _update_bounds_attributes(variables)
    399     for v in variables.values():
    400         attrs = v.attrs
--> 401         has_date_units = "units" in attrs and "since" in attrs["units"]
    402         if has_date_units and "bounds" in attrs:
    403             if attrs["bounds"] in variables:

TypeError: argument of type 'Empty' is not iterable

Perhaps I need later versions of things.

Just try with a different remote file, e.g

url = 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc'

Just a guess, but have you tried asking Beam to use cloudpickle rather than dill? I believe this is a recently added option. Cloudpickle is used for Dask, so this might work a bit more consistently.

Thanks for the suggestion Stephan. I tried as follows

from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions(pickle_library="cloudpickle")
with TestPipeline(options=options) as p:

However, it appears to have no effect. I can specify pickle_library="foobar" with no consequences. I am on Beam 2.38.0, so it should support the cloudpickle option, which was introduced in 2.36.0. Maybe I am not passing the option correctly...

I can specify pickle_library="foobar" with no consequences.

This is probably a red herring. The code to set the pickler doesn't raise any errors if you pass an invalid option. So I am going to assume that that option does work, and that it doesn't solve the problem.

Some more deep introspecting into these objects.

import xarray as xr
import fsspec
from cloudpickle import dumps, loads
from pprint import pprint as print

url = 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc'
with fsspec.open(url) as fp:
    with xr.open_dataset(fp, engine='h5netcdf') as ds0:
        pass
    
ds_pk = dumps(ds0)
ds1 = loads(ds_pk)

# go deep inside Xarray's array wrappers to get out the `xarray.backends.h5netcdf_.H5NetCDFArrayWrapper` objects
wrapper0 = ds0.T_ZONES.variable._data.array.array.array.array.array
wrapper1 = ds1.T_ZONES.variable._data.array.array.array.array.array

# now go inside those and get the actual `fsspec.implementations.http.HTTPFile` objects 
fobj0 = wrapper0.datastore._manager._args[0]
fobj1 = wrapper1.datastore._manager._args[0]

print(fobj0.__dict__)
print(fobj1.__dict__)
{'_closed': True,
 '_details': {'name': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc',
              'size': 1183063,
              'type': 'file'},
 'asynchronous': False,
 'autocommit': True,
 'blocksize': 5242880,
 'cache': None,
 'end': None,
 'fs': <fsspec.implementations.http.HTTPFileSystem object at 0x187cf1c10>,
 'kwargs': {},
 'loc': 1183063,
 'loop': <_UnixSelectorEventLoop running=True closed=False debug=False>,
 'mode': 'rb',
 'path': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc',
 'session': <aiohttp.client.ClientSession object at 0x187cf1e20>,
 'size': 1183063,
 'start': None,
 'url': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc'}
{'_closed': False,
 '_details': {'name': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc',
              'size': 1183063,
              'type': 'file'},
 'asynchronous': False,
 'autocommit': True,
 'blocksize': 5242880,
 'cache': <fsspec.caching.BaseCache object at 0x18d4a32e0>,
 'end': None,
 'fs': <fsspec.implementations.http.HTTPFileSystem object at 0x187cf1c10>,
 'kwargs': {},
 'loc': 0,
 'loop': <_UnixSelectorEventLoop running=True closed=False debug=False>,
 'mode': 'rb',
 'path': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc',
 'session': <aiohttp.client.ClientSession object at 0x187cf1e20>,
 'size': 1183063,
 'start': None,
 'url': 'https://power-datastore.s3.amazonaws.com/v9/climatology/power_901_rolling_zones_utc.nc'}

I tried taking fobj0 and manually setting

fobj0._closed = False
fobj0.loc = 0

ds0.load()

However, this lead to the error

~/Code/filesystem_spec/fsspec/spec.py in readinto(self, b)
   1586         """
   1587         out = memoryview(b).cast("B")
-> 1588         data = self.read(out.nbytes)
   1589         out[: len(data)] = data
   1590         return len(data)

~/Code/filesystem_spec/fsspec/implementations/http.py in read(self, length)
    572         else:
    573             length = min(self.size - self.loc, length)
--> 574         return super().read(length)
    575 
    576     async def async_fetch_all(self):

~/Code/filesystem_spec/fsspec/spec.py in read(self, length)
   1576             # don't even bother calling fetch
   1577             return b""
-> 1578         out = self.cache._fetch(self.loc, self.loc + length)
   1579         self.loc += len(out)
   1580         return out

AttributeError: 'NoneType' object has no attribute '_fetch'

So we need a file-like where close() as a no-op? It seems like a natural thing to discard read buffers if close gets called, and I suppose that is expected to happen on context end, whether or not explicitly done by xarray.

Perhaps we are barking up the wrong tree. Once the dataset is passed through pickle or cloudpickle, it becomes loadable again. In #49 (comment) @shoyer suggested we should be able to force beam to use cloudpickle to serialize things. So it should be working without any changes to our libraries. I am currently trying to dig deeper into the dill vs. cloudpickle issue.

The fundamental issue seems to be simply this

with fsspec.open(URL) as fp:
    ds = xr.open_dataset(fp, engine='h5netcdf')
ds.load()  # -> ValueError: I/O operation on closed file.

I'm finally looking into this a little in detail.

Why do you use context managers in the functions that you're passing into beam.Map? I would generally not expect something like this to work -- the context manager is explicitly closing the file object.

Objects passed between transforms in a Beam pipeline are not necessarily serialized via pickle (which as I understand would fix this by reopening the file), because it's unnecessary overhead if the separate map stages are evaluated on the same machine.

So anyways, if I were do to this I would not use a context manager in the opener function.

Thanks a lot Stephan! I appreciate your time.

Why do you use context managers in the functions that you're passing into beam.Map?

Because that's what fsspec seems to require! I went on an extremely deep dive on this in fsspec/filesystem_spec#579. In the end, the recommendation from @martindurant was to always use the context manager when opening a file-like object (see fsspec/filesystem_spec#579 (comment)). However, that requirement seems incompatible with serialization, as you noted.

I would love to see an example of opening a NetCDF4 file remotely over HTTP using the h5netcdf engine without a context manager.

I would love to see an example of opening a NetCDF4 file remotely over HTTP using the h5netcdf engine without a context manager.

Ok, so I actually did test this case in fsspec/filesystem_spec#579 (comment). The following works with HTTP

def open_fsspec_openfile_with_xarray(of):
    return xr.open_dataset(of.open(), engine='h5netcdf')

@martindurant, is that kosher?

Yes, it's fine - the file will "close" (meaning dropping the buffer) when garbage collected. Local files instances made this way also pickle.

But if I do, that I again hit the problem (from fsspec/filesystem_spec#579 (comment)) that fs.open and fsspec.open have different behavior! ๐Ÿ˜ซ

This works

open_file = fsspec.open(url)
ds = xr.open_dataset(open_file.open())

# but not
ds = xr.open_dataset(open_file)
# -> AttributeError: 'HTTPFile' object has no attribute '__fspath__'

or this works

from fsspec.implementations.http import HTTPFileSystem
fs = HTTPFileSystem()
open_file = fs.open(url)
ds = xr.open_dataset(open_file)

# but not
ds = xr.open_dataset(open_file.open())
# -> AttributeError: 'HTTPFile' object has no attribute 'open'"

Ok I am satisfied with my workaround in pangeo-forge/pangeo-forge-recipes@c20f3fd, which is basically

if hasattr(open_file, "open"):
    open_file = open_file.open()
ds = xr.open_dataset(open_file)

This seems to reliably serialize with whatever fsspec can throw at us.

Sorry for the noise here. I appreciate the help.