Support Prefect 3
Andrew-S-Rosen opened this issue · 5 comments
Details about the quacc environment
- quacc version: 0.9.2
- Python version: 3.10.14
- prefect version: 3.0.0.rc2
What is the issue?
When using Prefect 3.x, most dynamic workflows in quacc become broken.
How can we easily reproduce the issue?
pip install quacc[dev]
pip install prefect==3.0.0rc2
quacc set WORKFLOW_ENGINE prefect
from quacc import flow, job, subflow
@job
def add(a, b):
return a + b
@job
def make_more(val):
return [val] * 3
@subflow
def add_distributed(vals, c):
return [add(val, c) for val in vals]
@flow
def dynamic_workflow(a, b, c):
result1 = add(a, b)
result2 = make_more(result1)
return add_distributed(result2, c)
assert [r.result() for r in dynamic_workflow(1, 2, 3)] == [6, 6, 6]
Alternatively, you can run pytest tests/prefect/test_syntax.py
to find more examples.
In running the above minimal example, you'll find that it stalls completely (if run via pytest
) or simply returns an infinite loop of errors (if run in an IPython console or REPL).
Any Ideas Why?
The root cause is the following monkeypatch:
Lines 47 to 60 in 797602a
If we comment out PrefectFuture.__getitem__ = _patched_getitem
, the minimal example will run to completion, albeit at the expense of other tests failing since we have removed the implicit deferral of __getitem__
calls.
I believe the problem is likely related to the fact that there are actually several new Perfect future classes like a PrefectConcurrentFuture
among others. However, I tried playing around with them and wasn't able to make much progress from a quick effort.
Tagging @zulissimeta as a head's up (and in case you're feeling adventurous...).
The entire test suite runs to completion if the following small change is made: changing @job
to @task
on line 49 below.
Lines 44 to 56 in 56a923a
In other words, it works perfectly fine if we don't .submit()
the __getitem__
call to the concurrent task runner, which the @job
decorator does automatically. However, it's likely this is not a mergeable change because if we use @task
here, then the __getitem__
call will run server-side. This has two potential drawbacks: 1) the server needs the same dependencies as the remote machine; 2) If there are many __getitem__
calls being made, there may be a bottleneck. However, I'm not 100% sure if it's a problem in practice. At least with the SLURM-based DaskTaskRunner
, I'm pretty sure the login node and compute node need the dependencies. Additionally, relying on __getitem__
on the login node isn't a huge issue because if the user decides to submit one-Slurm-job-per-task, they wouldn't want such trivial calls to go to Slurm anyway. There is the question about what happens with a remote server setup though.
The biggest problem actually seems to be that you lose the DAG if some tasks are submitted and others aren't.
This might be related (and if so, it might be worth trying the tests again with the latest prefect3): PrefectHQ/prefect#14036
Note to self: this does not seem related to PrefectHQ/prefect#14036 since the errors persist in 3.0.0rc15.
We know that the following always works:
from prefect import task, flow
@task
def my_task():
return 1
@flow
def my_subflow(dummy):
return 1
@flow
def my_flow():
result = my_task.submit()
return my_subflow(result)
assert my_flow() == 1
But for some reason in Prefect 3, the following doesn't:
from prefect.futures import PrefectFuture
from prefect import flow, task
def _patched_getitem(self, index):
@task
def get_item(obj, key):
return obj[key]
return get_item.submit(self, index)
PrefectFuture.__getitem__ = _patched_getitem
@task
def my_task():
return 1
@flow
def my_subflow(dummy):
return 1
@flow
def my_flow():
result = my_task.submit()
return my_subflow(result)
assert my_flow() == 1
Linking to PrefectHQ/prefect#14881.
Prefect 3 support will be fixed with the upstream Prefect PR I opened: PrefectHQ/prefect#14900.