Silent failure with --runner=DataflowRunner
feelingsonice opened this issue · 2 comments
feelingsonice commented
System information
- Environment: Google Colab, Vertex AI (
KubeflowV2DagRunner
) - TensorFlow version: 2.8.0
- TFX Version: 1.7.0
- Python version: 3.7.12
Here's my preprocessing_fn
(redacted for clarity):
_FEATURES = [# list of str
]
_SPECIAL_IMPUTE = {
'special_foo': 1,
}
HOURS = [1, 2, 3, 4]
TABLE_KEYS = {
'XXX': ['XXX_1', 'XXX_2', 'XXX_3'],
'YYY': ['YYY_1', 'YYY_2', 'YYY_3'],
}
@tf.function
def _divide(a, b):
return tf.math.divide_no_nan(tf.cast(a, tf.float32), tf.cast(b, tf.float32))
def preprocessing_fn(inputs):
x = {}
for name, tensor in sorted(inputs.items()):
if tensor.dtype == tf.bool:
tensor = tf.cast(tensor, tf.int64)
if isinstance(tensor, tf.sparse.SparseTensor):
default_value = '' if tensor.dtype == tf.string else 0
tensor = tft.sparse_tensor_to_dense_with_shape(tensor, [None, 1], default_value)
x[name] = tensor
x['foo'] = _divide((x['foo1'] - x['foo2']), x['foo_denom'])
x['bar'] = tf.cast(x['bar'] > 0, tf.int64)
for hour in HOURS:
total = tf.constant(0, dtype=tf.int64)
for device_type in DEVICE_TYPES.keys():
total = total + x[f'some_device_{device_type}_{hour}h']
# one hot encode categorical values
for name, keys in TABLE_KEYS.items():
with tf.init_scope():
initializer = tf.lookup.KeyValueTensorInitializer(
tf.constant(keys),
tf.constant([i for i in range(len(keys))]))
table = tf.lookup.StaticHashTable(initializer, default_value=-1)
indices = table.lookup(tf.squeeze(x[name], axis=1))
one_hot = tf.one_hot(indices, len(keys), dtype=tf.int64)
for i, _tensor in enumerate(tf.split(one_hot, num_or_size_splits=len(keys), axis=1)):
x[f'{name}_{keys[i]}'] = _tensor
return {name: tft.scale_to_0_1(x[name]) for name in _FEATURES}
Here's the beam_pipeline_args
:
BIG_QUERY_WITH_DIRECT_RUNNER_BEAM_PIPELINE_ARGS = [
'--project=' + GOOGLE_CLOUD_PROJECT,
'--temp_location=' + os.path.join('gs://', GCS_BUCKET_NAME, 'tmp'),
'--runner=DataflowRunner',
'--region=us-central1',
'--experiments=upload_graph', # must be enabled, otherwise fails with 413
'--dataflow_service_options=enable_prime',
'--autoscaling_algorithm=THROUGHPUT_BASED',
]
Not sure if related but with the above preprocessing_fn
, my transform first failed with the error:
RuntimeError: The order of analyzers in your `preprocessing_fn` appears to be non-deterministic. This can be fixed either by changing your `preprocessing_fn` such that tf.Transform analyzers are encountered in a deterministic order or by passing a unique name to each analyzer API call.
I then added names to the tft.scale_to_0_1
analyzers:
return {name: tft.scale_to_0_1(x[name], name=f'{name}_scale_to_0_1') for name in _FEATURES}
After which my transform just silently failed without logs (see first screenshot). I check the worker logs but there's nothing substantial, only warnings (see second screenshot).
It's worth noting that I have the enable_prime
flag.
pindinagesh commented
bli00 commented
@pindinagesh After I removed the --dataflow_service_options=enable_prime
flag it worked fine. Are you guys planning to support dataflow prime?