ncar-xdev/xpersist

Support for multiple outputs in prefect tasks

Opened this issue · 3 comments

Prefect tasks can return multiple outputs and it would be nice if the FunnelResult supported this as well.

from typing import Tuple

import xarray as xr

from funnel import CacheStore
from funnel.prefect.result import FunnelResult
from prefect import task, Flow

store = CacheStore()


@task(result=FunnelResult(store, serializer='xarray.zarr'), target='foo')
def my_task() -> Tuple[xr.Dataset, xr.Dataset]:
    a = xr.DataArray([1, 2, 3], dims='x', name='foo-1').to_dataset()
    b = xr.DataArray([1, 2, 3], dims='x', name='foo-2').to_dataset()
    return a, b


with Flow('foo-flow') as flow:
    ds_a, ds_b = my_task()
    
flow.run()

This currently fails with the following error:

Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/xarray/core/dataset.py", line 1398, in _construct_dataarray
    variable = self._variables[name]
KeyError: 0

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
    value = prefect.utilities.executors.run_task_with_timeout(
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
    return task.run(*args, **kwargs)  # type: ignore
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
    return run_method(self, *args, **kwargs)
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/prefect/tasks/core/operators.py", line 38, in run
    return task_result[key]
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/xarray/core/dataset.py", line 1502, in __getitem__
    return self._construct_dataarray(key)
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/xarray/core/dataset.py", line 1400, in _construct_dataarray
    _, name, variable = _get_virtual_variable(
  File "/srv/conda/envs/notebook/lib/python3.9/site-packages/xarray/core/dataset.py", line 158, in _get_virtual_variable
    raise KeyError(key)
KeyError: 0

But this has me thinking that the FunnelResult may need some slight modifications to handle multiple outputs. As a starting point, I'm curious if a modified artifact schema is necessary. In the example below I turn the json object into a list of artifacts

# /tmp/funnel_metadata_store/foo.artifact.json
[
  {
    "key": "foo[0]",
    "serializer": "xarray.zarr",
    "load_kwargs": {},
    "dump_kwargs": {},
    "created_at": "2021-12-03T23:19:11.186053"
  },
  {
    "key": "foo[1]",
    "serializer": "xarray.zarr",
    "load_kwargs": {},
    "dump_kwargs": {},
    "created_at": "2021-12-03T23:19:11.186053"
  }
]

I believe we could support this by

I have one question though. In your example, the type of returned values is homogeneous (which makes it easy to serialize using the same serializer). What should happen when the function returns multiple outputs with different types, say a tuple with a Pandas DataFrame and an xarray Dataset? Should we even support this use case?

What should happen when the function returns multiple outputs with different types, say a tuple with a Pandas DataFrame and an xarray Dataset? Should we even support this use case?

Honestly, I'm not sure how to handle this. We may want to share this use case (and the funnel project) with the Prefect developers and see what they say.

it appears that Prefect itself doesn't support caching for tasks that return multiple values. This conclusion is purely based on my attempts at tinkering with the caching and going over the docs. So, if I missed something, please let me know :).

Also, I started a discussion here: PrefectHQ/prefect#5201