Support for multiple outputs in prefect tasks
Opened this issue · 3 comments
Prefect tasks can return multiple outputs and it would be nice if the FunnelResult
supported this as well.
from typing import Tuple
import xarray as xr
from funnel import CacheStore
from funnel.prefect.result import FunnelResult
from prefect import task, Flow
store = CacheStore()
@task(result=FunnelResult(store, serializer='xarray.zarr'), target='foo')
def my_task() -> Tuple[xr.Dataset, xr.Dataset]:
a = xr.DataArray([1, 2, 3], dims='x', name='foo-1').to_dataset()
b = xr.DataArray([1, 2, 3], dims='x', name='foo-2').to_dataset()
return a, b
with Flow('foo-flow') as flow:
ds_a, ds_b = my_task()
flow.run()
This currently fails with the following error:
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.9/site-packages/xarray/core/dataset.py", line 1398, in _construct_dataarray
variable = self._variables[name]
KeyError: 0
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/srv/conda/envs/notebook/lib/python3.9/site-packages/prefect/engine/task_runner.py", line 876, in get_task_run_state
value = prefect.utilities.executors.run_task_with_timeout(
File "/srv/conda/envs/notebook/lib/python3.9/site-packages/prefect/utilities/executors.py", line 454, in run_task_with_timeout
return task.run(*args, **kwargs) # type: ignore
File "/srv/conda/envs/notebook/lib/python3.9/site-packages/prefect/utilities/tasks.py", line 456, in method
return run_method(self, *args, **kwargs)
File "/srv/conda/envs/notebook/lib/python3.9/site-packages/prefect/tasks/core/operators.py", line 38, in run
return task_result[key]
File "/srv/conda/envs/notebook/lib/python3.9/site-packages/xarray/core/dataset.py", line 1502, in __getitem__
return self._construct_dataarray(key)
File "/srv/conda/envs/notebook/lib/python3.9/site-packages/xarray/core/dataset.py", line 1400, in _construct_dataarray
_, name, variable = _get_virtual_variable(
File "/srv/conda/envs/notebook/lib/python3.9/site-packages/xarray/core/dataset.py", line 158, in _get_virtual_variable
raise KeyError(key)
KeyError: 0
But this has me thinking that the FunnelResult
may need some slight modifications to handle multiple outputs. As a starting point, I'm curious if a modified artifact schema is necessary. In the example below I turn the json object into a list of artifacts
# /tmp/funnel_metadata_store/foo.artifact.json
[
{
"key": "foo[0]",
"serializer": "xarray.zarr",
"load_kwargs": {},
"dump_kwargs": {},
"created_at": "2021-12-03T23:19:11.186053"
},
{
"key": "foo[1]",
"serializer": "xarray.zarr",
"load_kwargs": {},
"dump_kwargs": {},
"created_at": "2021-12-03T23:19:11.186053"
}
]
I believe we could support this by
-
Allowing
put()
method to receive non-keyword arguments as values: https://github.com/NCAR/esds-funnel/blob/82c4b984e2b97a1bbe07435367e7606e80bf9c8f/funnel/cache.py#L182-L186I'm imagining
def put( self, key: str, *value: typing.Tuple[typing.Any, ...],
-
Extending the sidecar file schema to consist of a list of
Artifact
as you suggested above. -
Modifying
get()
method to return a single value or a tuple of values depending on the number of artifacts recorded in the sidecar JSON file.
I have one question though. In your example, the type of returned values is homogeneous (which makes it easy to serialize using the same serializer). What should happen when the function returns multiple outputs with different types, say a tuple with a Pandas DataFrame and an xarray Dataset? Should we even support this use case?
What should happen when the function returns multiple outputs with different types, say a tuple with a Pandas DataFrame and an xarray Dataset? Should we even support this use case?
Honestly, I'm not sure how to handle this. We may want to share this use case (and the funnel project) with the Prefect developers and see what they say.
it appears that Prefect itself doesn't support caching for tasks that return multiple values. This conclusion is purely based on my attempts at tinkering with the caching and going over the docs. So, if I missed something, please let me know :).
Also, I started a discussion here: PrefectHQ/prefect#5201