tensorflow/tfx

TFX trainer component running in Kubeflow fails although it was successful in the Interactive Context

Closed this issue · 8 comments

If the bug is related to a specific library below, please raise an issue in the
respective repo directly:

TensorFlow Data Validation Repo

TensorFlow Model Analysis Repo

TensorFlow Transform Repo

TensorFlow Serving Repo

System information

  • Have I specified the code to reproduce the issue (Yes, No):
  • Environment in which the code is executed (e.g., Local(Linux/MacOS/Windows), GCP
    Interactive Notebook, Google Cloud, etc): Notebook
  • TensorFlow version: 2.13.1
  • TFX Version: 1.14.0
  • Python version: 3.8
  • Kubeflow version: 1.8.22
  • Python dependencies (from pip freeze output):
  • tensorflow-metadata==1.14.0

Describe the current behavior

In GCP I run a Kubeflow ML pipeline with TFX components using a custom service account. The pipeline reads data from BigQuery and it has the following components: components = [example_gen, statistics_gen, schema_gen, transform, trainer, pusher]

The main problem is that it fails at the last "trainer" step, although I tested each step in the interactive context and all were OK. The secondary problem is that I cannot display log messages for the trainer module execution code in the main GCP pipeline dashboard (in the logs area). This complicates my debugging attempts. I can only view the logs from Logs Explorer but I cannot display the messages for the python trainer module code, seem only to be the framework messages. In those messages I view only one type of error message. I identified that this operation uses the default service account (not the custom one) and it might not have all permissions needed. I tried to set the trainer component to use the custom SA but it does not use it. How can I set it properly for the custom SA?

com.google.cloud.ai.platform.common.errors.AiPlatformException: code=ALREADY_EXISTS, message=Schema with name projects/2385..../locations/..../metadataStores/default/metadataSchemas/f14caf7e-4234-473d-ac31-.... and version 0.0.1 already exists., cause=null

Please view details: https://stackoverflow.com/questions/77652732/tfx-trainer-component-running-in-kubeflow-fails-although-it-was-successful-in-th

Describe the expected behavior

Being able to succesfully run the TFX Trainer component with Kubeflow pipeline.

Standalone code to reproduce the issue

Providing a bare minimum test case or step(s) to reproduce the problem will
greatly help us to debug the issue. If possible, please share a link to
Colab/Jupyter/any notebook.

Name of your Organization (Optional)

Other info / logs

Include any logs or source code that would be helpful to diagnose the problem.
If including tracebacks, please include the full traceback. Large logs and files
should be attached.

@crbl1122,

Can you also share the complete logs from logs explorer for this pipeline or a minimal pipeline code to recreate this issue.

For the error which you mentioned in issues, the pipeline is saving the same artifacts in same location which is causing this error. Complete logs will help us understand which artifact is causing this issue and how we can resolve it. Thanks.

code updated

@singhniraj08 Thank you for the quick answer. I couldn't identify where the schema artifact (?) is written twice. Here is the code:

%%writefile {_trainer_module_file}

# Copied from https://www.tensorflow.org/tfx/tutorials/tfx/penguin_simple and
# slightly modified run_fn() to add distribution_strategy.

from typing import List
from absl import logging
import tensorflow as tf
from tensorflow import keras
from tensorflow_metadata.proto.v0 import schema_pb2
from tensorflow_transform.tf_metadata import schema_utils

from tfx import v1 as tfx
from tfx_bsl.public import tfxio

_FEATURE_KEYS = [
    'F1', 'F2', 'F3'
]
_LABEL_KEY = 'F_ASIG_FLAG'

_TRAIN_BATCH_SIZE = 20
_EVAL_BATCH_SIZE = 10

# Since we're not generating or creating a schema, we will instead create
# a feature spec.  Since there are a fairly small number of features this is
# manageable for this dataset.
_FEATURE_SPEC = {
    **{
        feature: tf.io.FixedLenFeature(shape=[1], dtype=tf.float32)
        for feature in _FEATURE_KEYS
    }, _LABEL_KEY: tf.io.FixedLenFeature(shape=[1], dtype=tf.int64)
}


def _input_fn(file_pattern: List[str],
              data_accessor: tfx.components.DataAccessor,
              schema: schema_pb2.Schema,
              batch_size: int) -> tf.data.Dataset:
  """Generates features and label for training.

  Args:
    file_pattern: List of paths or patterns of input tfrecord files.
    data_accessor: DataAccessor for converting input to RecordBatch.
    schema: schema of the input data.
    batch_size: representing the number of consecutive elements of returned
      dataset to combine in a single batch

  Returns:
    A dataset that contains (features, indices) tuple where features is a
      dictionary of Tensors, and indices is a single Tensor of label indices.
  """
  return data_accessor.tf_dataset_factory(
      file_pattern,
      tfxio.TensorFlowDatasetOptions(
          batch_size=batch_size, label_key=_LABEL_KEY),
      schema=schema).repeat()


def _make_keras_model() -> tf.keras.Model:
  """Creates a DNN Keras model for classifying penguin data.

  Returns:
    A Keras Model.
  """
  # The model below is built with Functional API, please refer to
  # https://www.tensorflow.org/guide/keras/overview for all API options.
  inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
  d = keras.layers.concatenate(inputs)
  for _ in range(2):
    d = keras.layers.Dense(8, activation='relu')(d)
  # outputs = keras.layers.Dense(3)(d)
    outputs = keras.layers.Dense(1, activation='sigmoid')(d) # binary classification

  model = keras.Model(inputs=inputs, outputs=outputs)
  model.compile(
      optimizer=keras.optimizers.Adam(1e-2),
      # loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
      loss=tf.keras.losses.BinaryCrossentropy,
      # metrics=[keras.metrics.SparseCategoricalAccuracy()]
      metrics=[keras.metrics.BinaryAccuracy()])
    

  model.summary(print_fn=logging.info)
  return model


# NEW: Read `use_gpu` from the custom_config of the Trainer.
#      if it uses GPU, enable MirroredStrategy.
def _get_distribution_strategy(fn_args: tfx.components.FnArgs):
  if fn_args.custom_config.get('use_gpu', False):
    logging.info('Using MirroredStrategy with one GPU.')
    return tf.distribute.MirroredStrategy(devices=['device:GPU:0'])
  return None


# TFX Trainer will call this function.
def run_fn(fn_args: tfx.components.FnArgs):
  """Train the model based on given args.

  Args:
    fn_args: Holds args used to train the model as name/value pairs.
  """

  # This schema is usually either an output of SchemaGen or a manually-curated
  # version provided by pipeline author. A schema can also derived from TFT
  # graph if a Transform component is used. In the case when either is missing,
  # `schema_from_feature_spec` could be used to generate schema from very simple
  # feature_spec, but the schema returned would be very primitive.
  schema = schema_utils.schema_from_feature_spec(_FEATURE_SPEC)

  train_dataset = _input_fn(
      fn_args.train_files,
      fn_args.data_accessor,
      schema,
      batch_size=_TRAIN_BATCH_SIZE)
  eval_dataset = _input_fn(
      fn_args.eval_files,
      fn_args.data_accessor,
      schema,
      batch_size=_EVAL_BATCH_SIZE)

  # NEW: If we have a distribution strategy, build a model in a strategy scope.
  strategy = _get_distribution_strategy(fn_args)
  if strategy is None:
    model = _make_keras_model()
  else:
    with strategy.scope():
      model = _make_keras_model()

  model.fit(
      train_dataset,
      steps_per_epoch=fn_args.train_steps,
      validation_data=eval_dataset,
      validation_steps=fn_args.eval_steps)

  # The result of the training should be saved in `fn_args.serving_model_dir`
  # directory.
  model.save(fn_args.serving_model_dir, save_format='tf')
######
# upload trainer module to GCS
!gsutil cp {_trainer_module_file_v2} {MODULE_ROOT}/

######
from typing import List, Optional

def _create_pipeline(pipeline_name: str, pipeline_root: str, # data_root: str,
                     query: str,
                     module_file: str, endpoint_name: str, project_id: str,
                     region: str, 
                     use_gpu: bool,
                     beam_pipeline_args: Optional[List[str]]) -> tfx.dsl.Pipeline:
  """Implements the penguin pipeline with TFX."""
  # Brings data into the pipeline or otherwise joins/converts training data.
  # example_gen = tfx.components.CsvExampleGen(input_base=data_root)
  
  # NEW: Query data in BigQuery as a data source.
  example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(query=query)

  # NEW: Configuration for Vertex AI Training.
  # This dictionary will be passed as `CustomJobSpec`.
  vertex_job_spec = {
      'project': project_id,
      'worker_pool_specs': [{
          'machine_spec': {
              'machine_type': 'n1-standard-4',
          },
          'replica_count': 1,
          'container_spec': {
              'image_uri': 'gcr.io/tfx-oss-public/tfx:{}'.format(tfx.__version__),
          },
      }],
  }
  if use_gpu:
    # See https://cloud.google.com/vertex-ai/docs/reference/rest/v1/MachineSpec#acceleratortype
    # for available machine types.
    vertex_job_spec['worker_pool_specs'][0]['machine_spec'].update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })

  # Trains a model using Vertex AI Training.
  # NEW: We need to specify a Trainer for GCP with related configs.
  trainer = tfx.extensions.google_cloud_ai_platform.Trainer(
      module_file=module_file,
      examples=example_gen.outputs['examples'],
      train_args=tfx.proto.TrainArgs(num_steps=100),
      eval_args=tfx.proto.EvalArgs(num_steps=5),
      custom_config={
          tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
              True,
          tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
              region,
          tfx.extensions.google_cloud_ai_platform.TRAINING_ARGS_KEY:
              vertex_job_spec,
          'use_gpu':
              use_gpu,
      })

  # NEW: Configuration for pusher.
  vertex_serving_spec = {
      'project_id': project_id,
      'endpoint_name': endpoint_name,
      # Remaining argument is passed to aiplatform.Model.deploy()
      # See https://cloud.google.com/vertex-ai/docs/predictions/deploy-model-api#deploy_the_model
      # for the detail.
      #
      # Machine type is the compute resource to serve prediction requests.
      # See https://cloud.google.com/vertex-ai/docs/predictions/configure-compute#machine-types
      # for available machine types and acccerators.
      'machine_type': 'n1-standard-4',
  }

  # Vertex AI provides pre-built containers with various configurations for
  # serving.
  # See https://cloud.google.com/vertex-ai/docs/predictions/pre-built-containers
  # for available container images.
  serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-6:latest'
  if use_gpu:
    vertex_serving_spec.update({
        'accelerator_type': 'NVIDIA_TESLA_K80',
        'accelerator_count': 1
    })
    serving_image = 'us-docker.pkg.dev/vertex-ai/prediction/tf2-gpu.2-6:latest'

  # NEW: Pushes the model to Vertex AI.
  # pusher = tfx.extensions.google_cloud_ai_platform.Pusher(
  #     model=trainer.outputs['model'],
  #     custom_config={
  #         tfx.extensions.google_cloud_ai_platform.ENABLE_VERTEX_KEY:
  #             True,
  #         tfx.extensions.google_cloud_ai_platform.VERTEX_REGION_KEY:
  #             region,
  #         tfx.extensions.google_cloud_ai_platform.VERTEX_CONTAINER_IMAGE_URI_KEY:
  #             serving_image,
  #         tfx.extensions.google_cloud_ai_platform.SERVING_ARGS_KEY:
  #           vertex_serving_spec,
  #     })

  components = [
      example_gen,
      trainer,
      # pusher,
  ]

  return tfx.dsl.Pipeline(
      pipeline_name=pipeline_name,
      pipeline_root=pipeline_root,
      components=components,
      #NEW: `beam_pipeline_args` is required to use BigQueryExampleGen.
      beam_pipeline_args=beam_pipeline_args)


#### setting constants ###
import uuid
import time

# Get the current time in UTC as a str
current_time_utc = str(int(time.time()))

PIPELINE_NAME = 'test-ppline' + '-' + str(uuid.uuid4()) + '-' + current_time_utc
print('PIPELINE_NAME', PIPELINE_NAME)

# Path to various pipeline artifact.
PIPELINE_ROOT = 'gs://{}/pipeline_root/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

# Paths for users' Python module.
MODULE_ROOT = 'gs://{}/pipeline_module/{}'.format(
    GCS_BUCKET_NAME, PIPELINE_NAME)

print('MODULE_ROOT',MODULE_ROOT)

# Paths for users' data.
DATA_ROOT = 'gs://{}/data/{}'.format(GCS_BUCKET_NAME, PIPELINE_NAME)

# S-ar putea sa fie cauza de eroare in prima versiune
# # This is the path where your model will be pushed for serving.
# SERVING_MODEL_DIR = 'gs://{}/serving_model/{}'.format(
#     GCS_BUCKET_NAME, PIPELINE_NAME)

# Name of Vertex AI Endpoint.
ENDPOINT_NAME = 'prediction-' + PIPELINE_NAME

print('PIPELINE_ROOT: {}'.format(PIPELINE_ROOT))

#######
# docs_infra: no_execute
import os

PIPELINE_DEFINITION_FILE = PIPELINE_NAME + '_pipeline.json'

# We need to pass some GCP related configs to BigQuery. This is currently done
# using `beam_pipeline_args` parameter.
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   '--region=europe-west3'
   ]

runner = tfx.orchestration.experimental.KubeflowV2DagRunner(
    config=tfx.orchestration.experimental.KubeflowV2DagRunnerConfig(),
    output_filename=PIPELINE_DEFINITION_FILE)

_ = runner.run(
    _create_pipeline(
        pipeline_name=PIPELINE_NAME,
        pipeline_root=PIPELINE_ROOT,
        # data_root=DATA_ROOT,
        query=QUERY,
        module_file=os.path.join(MODULE_ROOT, _trainer_module_file),
        endpoint_name=ENDPOINT_NAME,
        project_id=GOOGLE_CLOUD_PROJECT,
        region=GOOGLE_CLOUD_REGION,
        beam_pipeline_args=BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS, # for BigQuery
        # We will use CPUs only for now.
        use_gpu=False))

######
# docs_infra: no_execute
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
import logging
logging.getLogger().setLevel(logging.INFO)

aiplatform.init(project=GOOGLE_CLOUD_PROJECT, location=GOOGLE_CLOUD_REGION)

job = pipeline_jobs.PipelineJob(template_path=PIPELINE_DEFINITION_FILE,
                                display_name=PIPELINE_NAME,
                                enable_caching=False)
job.submit(service_account=service_account)

@crbl1122,

I tried running the pipeline code but there where lot of errors in the pipeline code which needs to be fixed, so I was not able to replicate the issue using the example code. I tried running the TFX BigQuery tutorial and it ran without any issues.
Can you try running the example notebook and see if that works for you and share the pipeline code which can replicate the error. Thank you!

@singhniraj08 My biggest problem is that I cannot view in the Vertex interface or in LogsExplorer, the logs from the component execution, therefore it is almost impossible to debug them blindly. In other tests where I used only Kubeflow components instead of TFX, I was able to view them. Do you have any idea why it happens and what can I do to visualize the components logs?
I ensured that the service account used to run this pipeline has the log writer/viewer privileges with no effect.

Regarding this TFX pipeline, I follow the following two tutorials:

  1. Read data from BigQuery
    [https://www.tensorflow.org/tfx/tutorials/tfx/gcp/vertex_pipelines_bq](Read from BigQuery)
  2. create and run a TFX pipeline which trains an ML model using Vertex AI Training service and publishes it to Vertex AI for serving
    [https://www.tensorflow.org/tfx/tutorials/tfx/gcp/vertex_pipelines_vertex_training]( create and run a TFX pipeline which trains an ML model using Vertex AI Training service and publishes it to Vertex AI for serving)

I can run successfully, both of them independently. But when I combine them as per the above code, the second component (Trainer) fails. These are the differences between the code from tutorial #2 and my code:

  • Instead of reading from CSV, I read from BQ, so instead example_gen = tfx.components.CsvExampleGen, I use example_gen = tfx.extensions.google_cloud_big_query.BigQueryExampleGen(query=query) and this step runs successfully.
  • My trainer is a binary classifier instead of multiclass classifier as in tutorial, therefore I made the following adjustments to the
    build model function:

def _make_keras_model() -> tf.keras.Model:
"""Creates a DNN Keras model for classifying penguin data.

Returns:
A Keras Model.
"""

# The model below is built with Functional API, please refer to
# https://www.tensorflow.org/guide/keras/overview for all API options.
inputs = [keras.layers.Input(shape=(1,), name=f) for f in _FEATURE_KEYS]
d = keras.layers.concatenate(inputs)
for _ in range(2):
  d = keras.layers.Dense(8, activation='relu')(d)
# outputs = keras.layers.Dense(3)(d)
  outputs = keras.layers.Dense(1, activation='sigmoid')(d) # binary classification #<--New

model = keras.Model(inputs=inputs, outputs=outputs)
model.compile(
    optimizer=keras.optimizers.Adam(1e-2),
    # loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), 
    loss=tf.keras.losses.BinaryCrossentropy, #<--new
    # metrics=[keras.metrics.SparseCategoricalAccuracy()]
    metrics=[keras.metrics.BinaryAccuracy()]) #<-- new

I will update the code in the issue to match the modified code

@singhniraj08 Hi, Even without logging, I managed to identify the coding problems in the trainer module and it works now.
It is important to know how to resolve the problem of missing visualization of the components logs. If not possible to answer here,
I will close this issue and open another one.

@crbl1122, Normally the component logs should appear in logs explorer, but I will try to verify it with the team and update this issue.
Since this issue has been resolved you, requesting you to close this issue. Thank you!

Are you satisfied with the resolution of your issue?
Yes
No