tensorflow/transform

Silent failure with --runner=DataflowRunner

feelingsonice opened this issue · 2 comments

System information

  • Environment: Google Colab, Vertex AI (KubeflowV2DagRunner)
  • TensorFlow version: 2.8.0
  • TFX Version: 1.7.0
  • Python version: 3.7.12

Here's my preprocessing_fn (redacted for clarity):

_FEATURES = [# list of str
]
_SPECIAL_IMPUTE = {
    'special_foo': 1,
}
HOURS = [1, 2, 3, 4]
TABLE_KEYS = {
    'XXX': ['XXX_1', 'XXX_2', 'XXX_3'],
    'YYY': ['YYY_1', 'YYY_2', 'YYY_3'],
}

@tf.function
def _divide(a, b):
  return tf.math.divide_no_nan(tf.cast(a, tf.float32), tf.cast(b, tf.float32))

def preprocessing_fn(inputs):
  x = {}

  for name, tensor in sorted(inputs.items()):
    if tensor.dtype == tf.bool:
      tensor = tf.cast(tensor, tf.int64)

    if isinstance(tensor, tf.sparse.SparseTensor):
      default_value = '' if tensor.dtype == tf.string else 0
      tensor = tft.sparse_tensor_to_dense_with_shape(tensor, [None, 1], default_value)
    
    x[name] = tensor

  x['foo'] = _divide((x['foo1'] - x['foo2']), x['foo_denom'])
  x['bar'] = tf.cast(x['bar'] > 0, tf.int64)

  for hour in HOURS:
      total = tf.constant(0, dtype=tf.int64)
      for device_type in DEVICE_TYPES.keys():
          total = total + x[f'some_device_{device_type}_{hour}h']

  # one hot encode categorical values
  for name, keys in TABLE_KEYS.items():
    with tf.init_scope():
      initializer = tf.lookup.KeyValueTensorInitializer(
          tf.constant(keys), 
          tf.constant([i for i in range(len(keys))]))
      table = tf.lookup.StaticHashTable(initializer, default_value=-1)
      
    indices = table.lookup(tf.squeeze(x[name], axis=1))
    one_hot = tf.one_hot(indices, len(keys), dtype=tf.int64)

    for i, _tensor in enumerate(tf.split(one_hot, num_or_size_splits=len(keys), axis=1)):
      x[f'{name}_{keys[i]}'] = _tensor

  return {name: tft.scale_to_0_1(x[name]) for name in _FEATURES}

Here's the beam_pipeline_args:

BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
   '--project=' + GOOGLE_CLOUD_PROJECT,
   '--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
   '--runner=DataflowRunner',
   '--region=us-central1',
  '--experiments=upload_graph', # must be enabled, otherwise fails with 413
   '--dataflow_service_options=enable_prime',
   '--autoscaling_algorithm=THROUGHPUT_BASED',
   ]

Not sure if related but with the above preprocessing_fn, my transform first failed with the error:

RuntimeError: The order of analyzers in your `preprocessing_fn` appears to be non-deterministic. This can be fixed either by changing your `preprocessing_fn` such that tf.Transform analyzers are encountered in a deterministic order or by passing a unique name to each analyzer API call.

I then added names to the tft.scale_to_0_1 analyzers:

return {name: tft.scale_to_0_1(x[name], name=f'{name}_scale_to_0_1') for name in _FEATURES}

After which my transform just silently failed without logs (see first screenshot). I check the worker logs but there's nothing substantial, only warnings (see second screenshot).

It's worth noting that I have the enable_prime flag.

Screen Shot 2022-03-25 at 2 01 59 PM

Screen Shot 2022-03-25 at 2 01 40 PM

Hi @bli00

Could you please take a look at this issue and see if it helps in resolving your issue? Also you can refer to this doc which discusses about the error you mentioned. Thanks!

bli00 commented

@pindinagesh After I removed the --dataflow_service_options=enable_prime flag it worked fine. Are you guys planning to support dataflow prime?