tensorflow/tfx

Retry transient errors in Google Service clients

adriangay opened this issue · 5 comments

System information

  • TFX Version (you are using):
    1.9.1

  • Environment in which you plan to use the feature (e.g., Local
    Google Cloud

  • Are you willing to contribute it (Yes/No):
    Yes

Describe the feature and the current behavior/state.

We are running an increasing number of model and variant TFX training pipelines in Google Vertex AI. As a result of this, we are experiencing n increasing number of pipeline run failures at the Trainer step due this error:

ERROR 2023-04-08T17:37:32.681434100Z [resource.labels.taskName: workerpool0-0] status = StatusCode.UNAVAILABLE

ERROR 2023-04-08T17:37:32.681441481Z [resource.labels.taskName: workerpool0-0] details = "The service is currently unavailable."

We raised a Google Support Case and the response from the Vertex AI team was:

Google already offer retry logic in the client in aiplatform_v1.services.job_service.client.py , as can be seen here:

https://github.com/googleapis/python-aiplatform/blob/main/google/cloud/aiplatform_v1/services/job_service/client.py#L878

which is perfectly reasonable. The arguments to get_custom_job are:

    def get_custom_job(
        self,
        request: Optional[Union[job_service.GetCustomJobRequest, dict]] = None,
        *,
        name: Optional[str] = None,
        retry: OptionalRetry = gapic_v1.method.DEFAULT,
        timeout: Union[float, object] = gapic_v1.method.DEFAULT,
        metadata: Sequence[Tuple[str, str]] = (),
    ) -> custom_job.CustomJob:

In tfx/extensions/google_cloud_ai_platform/training_clients.py both VertexJobClient and CAIPJobClient create custom jobs and gets job status using get_job. These methods do not provide a retry argument when using get_custom_job:

    return self._client.get_custom_job(name=self._job_name)

Looking in to the retry: OptionalRetry argument of get_custom_job:

https://github.com/googleapis/python-api-core/blob/5dfb6afda81bcc13e3432b020e144f7873fe1f4e/google/api_core/retry.py#L318

we find the Retry class supports:

    def __init__(
        self,
        predicate=if_transient_error,
        initial=_DEFAULT_INITIAL_DELAY,
        maximum=_DEFAULT_MAXIMUM_DELAY,
        multiplier=_DEFAULT_DELAY_MULTIPLIER,
        timeout=_DEFAULT_DEADLINE,
        on_error=None,
        **kwargs

Some useful links to the Retry class:
https://googleapis.dev/python/google-api-core/latest/retry.html
https://github.com/googleapis/python-api-core/blob/main/google/api_core/retry.py
https://cloud.google.com/python/docs/reference/storage/1.39.0/retry_timeout

The if_transient_error default argument will retry on the following transient errors:

            google.api_core.exceptions.InternalServerError - HTTP 500, gRPC
            INTERNAL(13) and its subclasses.

            google.api_core.exceptions.TooManyRequests - HTTP 429

            google.api_core.exceptions.ServiceUnavailable - HTTP 503

            requests.exceptions.ConnectionError

            requests.exceptions.ChunkedEncodingError - The server declared
            chunked encoding but sent an invalid chunk.

            google.auth.exceptions.TransportError - Used to indicate an
            error occurred during an HTTP request.

so, adding a default retry=Retry() argument would seem to handle all transient errors with a default exponential backoff. This would mitigate the problem we are seeing in TFX Trainer which uses VertexJobClient. The same retry behaviour should also be added to CAIPJobClient. A simplistic 'fix' would be:

     def get_job(self) -> CustomJob:
    """Gets the long-running job."""
    return self._client.get_custom_job(name=self._job_name, retry=Retry())

The default predicate can be more selective, and maybe we would not want to retry, say, google.api_core.exceptions.InternalServerError. To solve our immediate problem, we might choose to use:

self._client.get_custom_job(name=self._job_name, retry=Retry())

Our understanding of the TFX implementation is limited, and maybe we are not aware of side-effects that this simple fix might trigger. Maybe a complete solution might surface the retry argument/options up to the TFX Component level such that users can be selective, per pipeline component, on whether retry was wanted or appropriate?

Examples of configuring retry 'policies' can be found here:

https://cloud.google.com/python/docs/reference/storage/1.39.0/retry_timeout

Will this change the current API? How?

No

Who will benefit with this feature?

Every TFX user that runs pipeline in Google Cloud

Do you have a workaround or are completely blocked by this? :

Can patch a workaround, but would prefer a production-quality solution as soon as possible

Name of your Organization (Optional)

Sky UK and NBCU (part of Comcast Group)

Any Other info.

@lego0901,

Can we implement retry parameter while creating custom jobs using get_custom_job from VertexJobClient and CAIPJobClient in tfx/extensions/google_cloud_ai_platform/training_clients.py, to avoid google_cloud_ai_platform.Trainer failures. Thanks!

@singhniraj08 @lego0901 I am happy to work on this. I have signed the CLA. Before I start, as I mentioned in the issue, I don't have enough experience with TFX implementation to anticipate side-effects across the entire suite of TFX components when adding Retry as described. So any advice/guidance appreciated 😸

Wow, this is a good feature request for enhancing the robustness! Let me see it and bring it into our discussion.

Sorry for my late response, and I sincerely appreciate your abundant studies!

Overall

I don't think it will cause another side effect for any form of implementations. This will just enhance the component execution that uses training_clients, only brings to the positive effects (but perhaps with the "appropriate" predicates?)

Implementation

Update the runner and executor

Let's trace the call path to the get_custom_job (thanks to your research!!)

Adding retry: Retry parameter for all above functions should be needed.

Update the component facade

Also, to be used from the UI component facade, you have to update tfx/extensions/google_cloud_ai_platform/trainer/component.py, by adding retry: Optional[Retry] = None as a new parameter.

You might wonder how this retry parameter from the facade (component.py) will be passed into the executor (executor.py) so that it is accessible from .Do() function. In conclusion, this retry parameter should be added to the exec_properties so that executor can read it by exec_properties['retry'].

To wire it from the component facade, which is used by the following chain:

If you add a new optional (so that there is no side effect) parameter on that, you can achieve the goal. For example:

  PARAMETERS = {
      TRAIN_ARGS_KEY: ExecutionParameter(type=trainer_pb2.TrainArgs),
      EVAL_ARGS_KEY: ExecutionParameter(type=trainer_pb2.EvalArgs),
      MODULE_FILE_KEY: ExecutionParameter(type=str, optional=True),
      MODULE_PATH_KEY: ExecutionParameter(type=str, optional=True),
      RUN_FN_KEY: ExecutionParameter(type=str, optional=True),
      TRAINER_FN_KEY: ExecutionParameter(type=str, optional=True),
      CUSTOM_CONFIG_KEY: ExecutionParameter(type=str, optional=True),
      RETRY_KEY: ExecutionParameter(type=Retry, optional=True),
  }

Wire the component and executor to pass the retry execution properties

Now, remaining things are simple.

  • You should add retry argument on the super().__init__
  • Of course from the super class, you have to add a new argument from __init__()
  • And then, update the SPEC declaration, then it will update the execution properties!

User journey

Simple. Users can use

tfx.extensions.google_cloud_ai_platform.trainer.component.Trainer(..., retry=retry)

whenever they want to use retry parameter.

Please ask me any additional questions if you have any, and huge thanks to pay attention to our product!

Hi @adriangay, we have received multiple feature requests originated from the same pain point you provided.

So, from here, I added the default retries.Retry() argument on creating a custom job for VAI runner.

Thanks for your suggestion and sorry for my late response.