tensorflow/tfx

Long running custom component cancelled on Vertex AI after exactly 7 days

IzakMaraisTAL opened this issue · 10 comments

I have a long running custom TFX component. The TFX pipeline is run on Vertex AI. After 7 days it gets cancelled.

image

I have tried this twice and found the same result. From the component logs, there is no error happening inside the component.

If you have the patience, you can reproduce this with a minimal custom component that just sleeps, waking every couple of minutes to log a message (this is what the component was doing when it was cancelled).

from tfx.dsl.component.experimental.decorators import component
import tensorflow as tf
import datetime
import time

@component
def Sleeper():
    while True:
        tf.print( f"{datetime.datetime.now()}. Sleeping for 5 minutes." )
        time.sleep(5 * 60)

TFX version: 1.12
Python version: 3.7

Is there a way to configure this apparent 7 day job timeout?

@IzakMaraisTAL,

By default, the maximum job running time is 7 days in Vertex AI and can be changed by passing "timeout" in scheduling section of CustomJobSpec and passing the configuration to TFX pipeline as shown here.

Thank you!

That looks very promising, thanks for the feedback @singhniraj08 .

I am not sure how to apply the CustomJobSpec to my custom component. In your example TFX pipeline, vertex_job_spec value (the CustomJobSpec) is passed into the tfx.extensions.google_cloud_ai_platform.Trainer component by assigning it to the parameter custom_config[tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY].

What is the equivalent when working with a custom Python function based component?

@IzakMaraisTAL,

I couldn't find anything under google_could_ai_platform extensions which can run custom component on Vertex AI. I think we need to add logic in custom component to create a custom job but I am not sure if this method will work.

@AnuarTB, Can we pass CustomJobSpec to a custom python based component and run it on Vertex AI. @IzakMaraisTAL wants to pass "timeout" scheduling of CustomJobSpec to custom component so the component doesn't get cancelled after 7 days(default value). Thanks

For reference, here is the minimal, but complete pipeline definition file (vertex_runner.py):

import tfx
from tfx.dsl.component.experimental.decorators import component
import tensorflow as tf
import datetime
import time
from tfx.orchestration.kubeflow.v2 import kubeflow_v2_dag_runner

@component
def Sleeper():
    while True:
        tf.print( f"{datetime.datetime.now()}. Sleeping for 5 minutes." )
        time.sleep(5 * 60)


def create_pipeline():
    sleeper = Sleeper()

    # Many other components are also added e.g. 
    # tfx.extensions.google_cloud_big_query.BigQueryExampleGen
    # tfx.components.StatisticsGen
    # tfx.components.ImportSchemaGen
    # tfx.components.Transform
    # tfx.extensions.google_cloud_ai_platform.Trainer
    # tfx.components.BulkInferrer
    components = [sleeper]
    
    return tfx.dsl.Pipeline(
        pipeline_name="pipeline_name",
        pipeline_root="<PIPELINE_ROOT>",
        components=components,
        enable_cache=False,
        metadata_connection_config=None,
        beam_pipeline_args=["--project=<GOOGLE_CLOUD_PROJECT>","--temp_location=<TEMP_LOCATION>"],
        ai_platform_training_args={
            "project": "<GOOGLE_CLOUD_PROJECT>",
            "region": "<GOOGLE_CLOUD_REGION>",
            "masterConfig": {
                "imageUri": "<PIPELINE_IMAGE>",
                "acceleratorConfig": {
                    "count": "1",
                    "type": "<GPU_TYPE>",
                },
            },
            "scaleTier": "CUSTOM",
            "masterType": "<MASTERTYPE>",
        }
    )



runner_config = kubeflow_v2_dag_runner.KubeflowV2DagRunnerConfig(
    "<PIPELINE_IMAGE>"
)

PIPELINE_DEFINITION_FILE = "pipeline_name.json"
runner = kubeflow_v2_dag_runner.KubeflowV2DagRunner(
    config=runner_config, output_filename=PIPELINE_DEFINITION_FILE
)

runner.run(create_pipeline())

The above is compiled using tfx pipeline compile --pipeline-path=vertex_runner.py --engine=vertex.

It is submitted to vertex using:

from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from vertex_runner import PIPELINE_DEFINITION_FILE

aiplatform.init(project="<GOOGLE_CLOUD_PROJECT>", location="<GOOGLE_CLOUD_REGION>")

job = pipeline_jobs.PipelineJob(
    template_path=PIPELINE_DEFINITION_FILE,
    display_name="pipeline_name",
    enable_caching=False,
)

job.run()

@singhniraj08 from what I have found in code, unfortunately it seems like you can pass CustomJobSpec only to Trainer and Tuner components. Making the timeout last for longer seems like a feature request which will require some time. I will try to raise this issue during our Wednesday meeting and report back. Meanwhile, I can try running component as a custom job, but I am not sure about it.

@IzakMaraisTAL could you please give context on what you are trying to do with the long-running component? TFX components were not designed to be long running, and we also fear that long-running components might cause billing problems.

What do you think of an idea to setup a custom job outside of TFX pipeline which will do the same work?

Thanks for the feedback.

For each example, the custom component downloads an image from an image service given the URL in the example.

To avoid a heavy load on the image service, the downloading is limited to early morning hours. Outside of morning hours, the component sleeps. Due to the large number of images it takes multiple days to download them all.

We realise this is not a great long-term architecture; the goal was do the simplest implementation for the first iteration of this machine learning project. If the project proves successful we refine the image downloading architecture and move it outside of TFX.

We would still appreciate a feature that supports a longer timeout via configuration.

From what I can see from the description, is it possible to have outside job downloading images and when it finishes downloading, make it so it can trigger the pipeline with all the gathered data?

Regarding the feature, unfortunately it is unlikely to be implemented in near future.

From what I can see from the description, is it possible to have outside job downloading images and when it finishes downloading, make it so it can trigger the pipeline with all the gathered data?

Yes, that would be possible. We had hoped that we could simplify it by doing everything in TFX for the first iteration.

Are you satisfied with the resolution of your issue?
Yes
No