[backend] `set_retry` for pipelines does not work
Opened this issue · 3 comments
Environment
- How did you deploy Kubeflow Pipelines (KFP)? Python --> VertexAI Pipeline.
- KFP version: KFP version 2.7.0.
- KFP SDK version:
kfp 2.7.0
kfp-pipeline-spec 0.3.0
kfp-server-api 2.0.0
Steps to reproduce
In the docs here it says "Pipelines can themselves be used as components in other pipelines, just as you would use any other single-step component in a pipeline".
I was testing this out to see if a pipeline within a pipeline can be retried but i can't get it to work. Here is what I've tried (based on this.)
from kfp import compiler
from kfp import dsl
@dsl.component
def print_op1(msg: str) -> str:
print(msg)
return msg
@dsl.container_component
def print_op2(msg: str):
return dsl.ContainerSpec(
image='alpine',
command=['echo', msg],
)
@dsl.component
def fail_job():
raise ValueError('This job failed')
@dsl.pipeline
def inner_pipeline(msg: str):
task = print_op1(msg=msg)
fail_job().after(task).set_retry(num_retries = 2)
print_op2(msg=task.output)
@dsl.pipeline(name='pipeline-in-pipeline')
def my_pipeline():
op1_out = print_op1(msg='Hello')
inner_out = inner_pipeline(msg='world').set_retry(num_retries=10).after(op1_out)
print_op1(msg='bye').after(inner_out)
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
package_path=__file__.replace('.py', '.yaml'))
The fail_job will retry, but the pipeline-in-pipeline does not. Am i wrong in my thinking?
Expected result
The pipeline-in-pipeline should retry as well.
This is related to my discussion here but making an issue for visibility.
Impacted by this bug? Give it a 👍.
@ianbenlolo set_retry
is not working even without nested pipelines, see #9950 , I've also tried it without nested pipelines. It doesn't reties even once based on this spec:
tasks:
fail-job:
cachingOptions:
enableCache: true
componentRef:
name: comp-fail-job
dependentTasks:
- print-op1
retryPolicy:
backoffDuration: 0s
backoffFactor: 2.0
backoffMaxDuration: 3600s
maxRetryCount: 2
taskInfo:
name: fail-job
I didn't try with vertexAI, but I guess set_retry
is also not supported by VertexAI yet, see https://issuetracker.google.com/issues/226569351
@Faakhir30 Please see my comment in the original thread. It works for me on vertex ai.
The issue is pipelines-in-pipelines that do not.
I am also encountering the same issue where retries are not functioning as expected.
The environment details are as follows:
How did you deploy Kubeflow Pipelines (KFP)?
Using a custom deployment on AWS EKS.
KFP version:
Kubeflow version: 1.9.0
KFP SDK version:
- kfp: 2.9.0
- kfp-pipeline-spec: 0.4.0
- kfp-server-api: 2.3.0
Here is the code:
import os
import kfp
from kfp.v2 import dsl
from kfp import kubernetes
@dsl.component(base_image="python:3.8")
def test1() -> None:
import sys
print("complete")
sys.exit(1)
@dsl.pipeline(
name="TestRetry",
description="test retry"
)
def test_pipeline():
test_task = test1()
test_task.set_retry(
num_retries=3,
backoff_duration="60s",
backoff_factor=2,
backoff_max_duration="3600s"
)
test_task.set_caching_options(enable_caching=False)
print("Compiling pipeline for cloud execution...")
kfp.v2.compiler.Compiler().compile(test_pipeline, "./test_retry.yaml")
The compiled result is as follows:
# PIPELINE DEFINITION
# Name: testretry
# Description: test retry
components:
comp-test1:
executorLabel: exec-test1
deploymentSpec:
executors:
exec-test1:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- test1
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.9.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef test1() -> None:\n import sys\n print(\"complete\")\n \
\ sys.exit(1)\n\n"
image: python:3.8
pipelineInfo:
description: test retry
name: testretry
root:
dag:
tasks:
test1:
cachingOptions: {}
componentRef:
name: comp-test1
retryPolicy:
backoffDuration: 60s
backoffFactor: 2.0
backoffMaxDuration: 3600s
maxRetryCount: 3
taskInfo:
name: test1
schemaVersion: 2.1.0
sdkVersion: kfp-2.9.0