Prefect Retry Task - Not retrying from point of failure
Opened this issue · 1 comments
cleung625 commented
Bug summary
Introduction
Hello! We ran into a weird issue with production lately where code is not being rerun from the point of failure whenever we retry a failed deployment run. When doing debugging and testing, I found two different behaviors with the retries
Set Up
- Using Prefect Server 2.18.3
- We have two machines using prefect. Machine A hosts the server and some process workers. Machine B hosts more process workers for redundancy.
- Deployments are run by pulling from AzureDevops Repository
- We have set
PREFECT_LOCAL_STORAGE_PATH=\\common\network\location
between the two of them, so that both workers can mutually access the same location
Code
Here is some code to get started and set up on the bug:
Behavior 1 - Using Persist_Result = True, Restarts from task A Regardless
import asyncio
from prefect import flow, get_run_logger, task
@task(persist_result=True)
async def task_A():
return 1
@task(persist_result=True)
async def task_B():
return 2
@task(persist_result=True)
async def task_C():
raise ValueError("This is an error")
return
@task(persist_result=True)
async def task_D():
return 4
@flow(persist_result=True)
async def test_retry_python(var_a, var_b):
logger = get_run_logger()
logger.info(f"Now Running: {var_a} and {var_b}")
a = await task_A.submit()
b = await task_B.submit(wait_for=[a])
c = await task_C.submit(wait_for=[b])
d = await task_C.submit(wait_for=[c])
logger.info(f"Result: {d}")
return d
if __name__ == "__main__":
asyncio.run(test_retry_python(var_a=2, var_b=3))
Result - Behavior 1
- Using
persist_result = True
, we expect that task_C should retry after we press retry on the deployment. Since it is defaulted to retry from the point of failure. - However, it retries from the beginning, despite workers accessing both locations.
- Note the storage location of the state - it is stored in the same location for both run attempts
Behavior 2 - Did not use persist_results, but is persisting by default, and is picking up on last retry point. However, it only does this if the worker is on the same machine it initially ran its first attempt on.
- This one is a little harder to replicate, as we are using KDB wrappers to run our async task queries, but in theory, it is the same as the one with native python. I am filing this for observation only to see if you have any leads.
- It is essentially "farming" off jobs to KDB (another language) to do their processes.
Code - Behavior 2
import asyncio
import os
import time
from typing import Literal
from prefect import flow, get_run_logger
from prefect.logging import get_run_logger
from prefect.runtime import flow_run
from prefect_libs import ccl_handling, ccl_kdb, ccl_kdb_setup
DROP_DOWN = Literal["1", "2", "3", "4"]
@flow(
flow_run_name="test_rery" + time.strftime("%Y-%m-%d"),
on_failure=[ccl_handling.hook], # Replace with actual error hook
log_prints=True,
)
async def task_retry(
env,
start: DROP_DOWN = "1",
stop: DROP_DOWN = "4",
email_recipients=None,
):
logger = get_run_logger()
current_time = time.ctime()
logger.info(f"Now running process: {flow_run.name}")
logger.info(f"Current time: {current_time!s}")
logger.info(f"Environment: {env}")
logger.info(f"User: {os.getlogin()}")
hmaster_port_number = await ccl_kdb_setup.grab_hmaster_port(env, "access")
conn = await ccl_kdb_setup.grab_port_handle.submit(env, hmaster_port_number)
# Task order
tasks_order = ["1", "2", "3", "4"]
# Get the subset of tasks to run using the helper function
tasks_to_run = ccl_kdb.get_tasks_to_run(tasks_order, start, stop)
# Initialize an empty dictionary to store task results
task_functions = {}
# Use the helper function to run the tasks
# Task 1: Wait for handles
await ccl_kdb.run_task_with_start_stop(
task_name="1",
ccl_function=ccl_kdb.async_query_access,
conn=conn,
script="1",
tasks_to_run=tasks_to_run,
task_functions=task_functions,
tags=["test"],
)
# Task 1: Wait for handles
await ccl_kdb.run_task_with_start_stop(
task_name="2",
ccl_function=ccl_kdb.async_query_access,
conn=conn,
script="2",
tasks_to_run=tasks_to_run,
task_functions=task_functions,
wait_for_tasks=["1"],
tags=["test"],
)
await ccl_kdb.run_task_with_start_stop(
task_name="3",
ccl_function=ccl_kdb.async_query_access,
conn=conn,
script="'I am an error",
tasks_to_run=tasks_to_run,
task_functions=task_functions,
retries=5,
retry_delay_seconds=10,
tags=["test"],
wait_for_tasks=["2"],
)
await ccl_kdb.run_task_with_start_stop(
task_name="4",
ccl_function=ccl_kdb.async_query_access,
conn=conn,
script="4",
tasks_to_run=tasks_to_run,
task_functions=task_functions,
tags=["test"],
wait_for_tasks=["3"],
)
if __name__ == "__main__":
asyncio.run(
task_retry(
env="qprod",
start="1",
stop="4",
email_recipients=["cleung@cclgroup.com", "aliceroberts@cclgroup.com"],
)
)
Result - Behavior 2
- Not using persist_results = true, we still get persisting result behavior?
- Somehow it is also retrying from the point of failure, which is what we want. HOWEVER, it is only retrying from the point of failure ONLY if the worker being assigned to retry is on the same machine as the first attempt workers, despite workers accessing both locations. Otherwise, it restarts from the beginning, as indicated from the image.
- Note the storage location of the state - it is stored in the same location for both run attempts
- In this example, the first attempt runs on a worker located in machine A. In the second attempt, the worker is located on machine B. However, we have set
PREFECT_LOCAL_STORAGE_PATH=\\common\network\location
between the two of them, so that both workers can mutually access the same location, as shown in the image.
Conclusion
It seems like there is variable behavior with retries. Am I not using this correctly? Or is there something else going on? I tried filing multiple times on Slack but it seems like there was no response to this. Thanks!
Version info
(C:\prefect-venv) C:\Users\qtask>prefect version
Version: 2.18.3
API version: 0.8.4
Python version: 3.9.19
Git commit: c449aee8
Built: Thu, May 2, 2024 5:47 PM
OS/Arch: win32/AMD64
Profile: default
Server type: server
Additional context
No response
cleung625 commented
Hey Team, any updates on this? This affects are capability to scale with different machines