PrefectHQ/prefect

Prefect Retry Task - Not retrying from point of failure

Opened this issue · 1 comments

Bug summary

Introduction

Hello! We ran into a weird issue with production lately where code is not being rerun from the point of failure whenever we retry a failed deployment run. When doing debugging and testing, I found two different behaviors with the retries

Set Up

  • Using Prefect Server 2.18.3
  • We have two machines using prefect. Machine A hosts the server and some process workers. Machine B hosts more process workers for redundancy.
  • Deployments are run by pulling from AzureDevops Repository
  • We have set PREFECT_LOCAL_STORAGE_PATH=\\common\network\location between the two of them, so that both workers can mutually access the same location

Code

Here is some code to get started and set up on the bug:

Behavior 1 - Using Persist_Result = True, Restarts from task A Regardless

import asyncio

from prefect import flow, get_run_logger, task


@task(persist_result=True)
async def task_A():
    return 1


@task(persist_result=True)
async def task_B():
    return 2


@task(persist_result=True)
async def task_C():
    raise ValueError("This is an error")
    return


@task(persist_result=True)
async def task_D():
    return 4


@flow(persist_result=True)
async def test_retry_python(var_a, var_b):
    logger = get_run_logger()
    logger.info(f"Now Running: {var_a} and {var_b}")

    a = await task_A.submit()
    b = await task_B.submit(wait_for=[a])
    c = await task_C.submit(wait_for=[b])
    d = await task_C.submit(wait_for=[c])

    logger.info(f"Result: {d}")

    return d


if __name__ == "__main__":
    asyncio.run(test_retry_python(var_a=2, var_b=3))

Result - Behavior 1

  • Using persist_result = True, we expect that task_C should retry after we press retry on the deployment. Since it is defaulted to retry from the point of failure.
  • However, it retries from the beginning, despite workers accessing both locations.
    image
  • Note the storage location of the state - it is stored in the same location for both run attempts

Behavior 2 - Did not use persist_results, but is persisting by default, and is picking up on last retry point. However, it only does this if the worker is on the same machine it initially ran its first attempt on.

  • This one is a little harder to replicate, as we are using KDB wrappers to run our async task queries, but in theory, it is the same as the one with native python. I am filing this for observation only to see if you have any leads.
  • It is essentially "farming" off jobs to KDB (another language) to do their processes.

Code - Behavior 2

import asyncio
import os
import time
from typing import Literal

from prefect import flow, get_run_logger
from prefect.logging import get_run_logger
from prefect.runtime import flow_run
from prefect_libs import ccl_handling, ccl_kdb, ccl_kdb_setup

DROP_DOWN = Literal["1", "2", "3", "4"]

@flow(
    flow_run_name="test_rery" + time.strftime("%Y-%m-%d"),
    on_failure=[ccl_handling.hook],  # Replace with actual error hook
    log_prints=True,
)
async def task_retry(
    env,
    start: DROP_DOWN = "1",
    stop: DROP_DOWN = "4",
    email_recipients=None,
):
    logger = get_run_logger()
    current_time = time.ctime()

    logger.info(f"Now running process: {flow_run.name}")
    logger.info(f"Current time: {current_time!s}")
    logger.info(f"Environment: {env}")
    logger.info(f"User: {os.getlogin()}")

    hmaster_port_number = await ccl_kdb_setup.grab_hmaster_port(env, "access")
    conn = await ccl_kdb_setup.grab_port_handle.submit(env, hmaster_port_number)

    # Task order

    tasks_order = ["1", "2", "3", "4"]

    # Get the subset of tasks to run using the helper function
    tasks_to_run = ccl_kdb.get_tasks_to_run(tasks_order, start, stop)

    # Initialize an empty dictionary to store task results
    task_functions = {}

    # Use the helper function to run the tasks
    # Task 1: Wait for handles
    await ccl_kdb.run_task_with_start_stop(
        task_name="1",
        ccl_function=ccl_kdb.async_query_access,
        conn=conn,
        script="1",
        tasks_to_run=tasks_to_run,
        task_functions=task_functions,
        tags=["test"],
    )

    # Task 1: Wait for handles
    await ccl_kdb.run_task_with_start_stop(
        task_name="2",
        ccl_function=ccl_kdb.async_query_access,
        conn=conn,
        script="2",
        tasks_to_run=tasks_to_run,
        task_functions=task_functions,
        wait_for_tasks=["1"],
        tags=["test"],
    )

    await ccl_kdb.run_task_with_start_stop(
        task_name="3",
        ccl_function=ccl_kdb.async_query_access,
        conn=conn,
        script="'I am an error",
        tasks_to_run=tasks_to_run,
        task_functions=task_functions,
        retries=5,
        retry_delay_seconds=10,
        tags=["test"],
        wait_for_tasks=["2"],
    )

    await ccl_kdb.run_task_with_start_stop(
        task_name="4",
        ccl_function=ccl_kdb.async_query_access,
        conn=conn,
        script="4",
        tasks_to_run=tasks_to_run,
        task_functions=task_functions,
        tags=["test"],
        wait_for_tasks=["3"],
    )


if __name__ == "__main__":
    asyncio.run(
        task_retry(
            env="qprod",
            start="1",
            stop="4",
            email_recipients=["cleung@cclgroup.com", "aliceroberts@cclgroup.com"],
        )
    )

Result - Behavior 2

  • Not using persist_results = true, we still get persisting result behavior?
  • Somehow it is also retrying from the point of failure, which is what we want. HOWEVER, it is only retrying from the point of failure ONLY if the worker being assigned to retry is on the same machine as the first attempt workers, despite workers accessing both locations. Otherwise, it restarts from the beginning, as indicated from the image.
  • Note the storage location of the state - it is stored in the same location for both run attempts
  • In this example, the first attempt runs on a worker located in machine A. In the second attempt, the worker is located on machine B. However, we have set PREFECT_LOCAL_STORAGE_PATH=\\common\network\location between the two of them, so that both workers can mutually access the same location, as shown in the image.

image

Conclusion

It seems like there is variable behavior with retries. Am I not using this correctly? Or is there something else going on? I tried filing multiple times on Slack but it seems like there was no response to this. Thanks!

Version info

(C:\prefect-venv) C:\Users\qtask>prefect version
Version:             2.18.3
API version:         0.8.4
Python version:      3.9.19
Git commit:          c449aee8
Built:               Thu, May 2, 2024 5:47 PM
OS/Arch:             win32/AMD64
Profile:             default
Server type:         server

Additional context

No response

Hey Team, any updates on this? This affects are capability to scale with different machines