Quantum-Accelerators/quacc

Support Prefect 3

Andrew-S-Rosen opened this issue · 5 comments

Details about the quacc environment

  • quacc version: 0.9.2
  • Python version: 3.10.14
  • prefect version: 3.0.0.rc2

What is the issue?

When using Prefect 3.x, most dynamic workflows in quacc become broken.

How can we easily reproduce the issue?

pip install quacc[dev]
pip install prefect==3.0.0rc2
quacc set WORKFLOW_ENGINE prefect
from quacc import flow, job, subflow


@job
def add(a, b):
    return a + b

@job
def make_more(val):
    return [val] * 3

@subflow
def add_distributed(vals, c):
    return [add(val, c) for val in vals]

@flow
def dynamic_workflow(a, b, c):
    result1 = add(a, b)
    result2 = make_more(result1)
    return add_distributed(result2, c)

assert [r.result() for r in dynamic_workflow(1, 2, 3)] == [6, 6, 6]

Alternatively, you can run pytest tests/prefect/test_syntax.py to find more examples.

In running the above minimal example, you'll find that it stalls completely (if run via pytest) or simply returns an infinite loop of errors (if run in an IPython console or REPL).

Any Ideas Why?

The root cause is the following monkeypatch:

# Monkeypatching for Prefect
if SETTINGS.WORKFLOW_ENGINE == "prefect":
from prefect.futures import PrefectFuture
from prefect.states import State
def _patched_getitem(self, index):
@job
def _getitem(future, index_):
return future[index_]
return _getitem(self, index)
PrefectFuture.__getitem__ = _patched_getitem
State.__getitem__ = _patched_getitem

If we comment out PrefectFuture.__getitem__ = _patched_getitem, the minimal example will run to completion, albeit at the expense of other tests failing since we have removed the implicit deferral of __getitem__ calls.

I believe the problem is likely related to the fact that there are actually several new Perfect future classes like a PrefectConcurrentFuture among others. However, I tried playing around with them and wasn't able to make much progress from a quick effort.

Tagging @zulissimeta as a head's up (and in case you're feeling adventurous...).

The entire test suite runs to completion if the following small change is made: changing @job to @task on line 49 below.

if SETTINGS.WORKFLOW_ENGINE == "prefect":
from prefect.futures import PrefectFuture
from prefect.states import State
def _patched_getitem(self, index):
@job
def _getitem(future, index_):
return future[index_]
return _getitem(self, index)
PrefectFuture.__getitem__ = _patched_getitem
State.__getitem__ = _patched_getitem

In other words, it works perfectly fine if we don't .submit() the __getitem__ call to the concurrent task runner, which the @job decorator does automatically. However, it's likely this is not a mergeable change because if we use @task here, then the __getitem__ call will run server-side. This has two potential drawbacks: 1) the server needs the same dependencies as the remote machine; 2) If there are many __getitem__ calls being made, there may be a bottleneck. However, I'm not 100% sure if it's a problem in practice. At least with the SLURM-based DaskTaskRunner, I'm pretty sure the login node and compute node need the dependencies. Additionally, relying on __getitem__ on the login node isn't a huge issue because if the user decides to submit one-Slurm-job-per-task, they wouldn't want such trivial calls to go to Slurm anyway. There is the question about what happens with a remote server setup though.

The biggest problem actually seems to be that you lose the DAG if some tasks are submitted and others aren't.

This might be related (and if so, it might be worth trying the tests again with the latest prefect3): PrefectHQ/prefect#14036

Note to self: this does not seem related to PrefectHQ/prefect#14036 since the errors persist in 3.0.0rc15.

We know that the following always works:

from prefect import task, flow

@task
def my_task():
    return 1

@flow
def my_subflow(dummy):
    return 1

@flow
def my_flow():
    result = my_task.submit()
    return my_subflow(result)

assert my_flow() == 1

But for some reason in Prefect 3, the following doesn't:

from prefect.futures import PrefectFuture
from prefect import flow, task

def _patched_getitem(self, index):
    @task
    def get_item(obj, key):
        return obj[key]

    return get_item.submit(self, index)

PrefectFuture.__getitem__ = _patched_getitem

@task
def my_task():
    return 1

@flow
def my_subflow(dummy):
    return 1

@flow
def my_flow():
    result = my_task.submit()
    return my_subflow(result)

assert my_flow() == 1

Linking to PrefectHQ/prefect#14881.

Prefect 3 support will be fixed with the upstream Prefect PR I opened: PrefectHQ/prefect#14900.