The sample demo have some bug.
Opened this issue · 18 comments
It seems that user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, devices=self.devices, name='user_embedding')
has some bugs。The embedding could not cannot identify which port the variables are on.
- If I change the embedding demo,
user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, distribute_strategy=self.strategy, # devices=self.devices, name='user_embedding')
.New problem arised. If anyone give me some advices。I think the embdding varibles updates are out of sync
Any update on this? I am also facing this issue
@alykhantejani Most of TFRA users are using GPU sync training without PS. So it's few people to aware this issue.
If this issue occurs only some of the time, it may be due to a faulty device setting, and we may be able to fix it. Here is the key code which may cause error
It seems that
user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, devices=self.devices, name='user_embedding')
has some bugs。The embedding could not cannot identify which port the variables are on.
- If I change the embedding demo,
user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, distribute_strategy=self.strategy, # devices=self.devices, name='user_embedding')
.New problem arised. If anyone give me some advices。I think the embdding varibles updates are out of sync
@sosixyz This could be caused by ShadowVariable wasn't created in the right device. Maybe there is a way to fix this bug. Here is the key code
@alykhantejani Most of TFRA users are using GPU sync training without PS. So it's few people to aware this issue. If this issue occurs only some of the time, it may be due to a faulty device setting, and we may be able to fix it. Here is the key code which may cause error
.
Thanks for the response, I'll try and take a closer look here. If using sync training with GPUs, you would need GPUs with large on-device memory correct? I thought for this reason PS strategy would be more common
I should not Im using an in-mem cluster for testing with 2 PS and passing these device names to BasicEmbedding
@alykhantejani Most of TFRA users are using GPU sync training without PS. So it's few people to aware this issue. If this issue occurs only some of the time, it may be due to a faulty device setting, and we may be able to fix it. Here is the key code which may cause error
.
Thanks for the response, I'll try and take a closer look here. If using sync training with GPUs, you would need GPUs with large on-device memory correct? I thought for this reason PS strategy would be more common
@alykhantejani Don’t worry the memory, DE alltoall embedding layer will shard entire embedding into different worker rank. And also you can use cpu embedding table, but DE HKV backend would be the best solution which is able to use both gpu memory and host memory for embedding storage. Most situations, 2T host memory is enough.
What’s more, if someone has spare time, we’re so welcome to contribute fast-slow kv container which allows user, for example, evicting key in faster HKV into slower Redis for extending a larger storage. All basic api has been developed, and now, we’re waiting someone to write a Component to realize it.
@sosixyz Could you please provide a minimum reproducible code?
Thank you for your reply!The reproducible code is copied from the link https://github.com/tensorflow/recommenders-addons/blob/master/demo/dynamic_embedding/movielens-1m-keras-ps/movielens-1m-keras-ps.py.
It seems that
user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, devices=self.devices, name='user_embedding')
has some bugs。The embedding could not cannot identify which port the variables are on.
- If I change the embedding demo,
user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, distribute_strategy=self.strategy, # devices=self.devices, name='user_embedding')
.New problem arised. If anyone give me some advices。I think the embdding varibles updates are out of sync
@sosixyz This could be caused by ShadowVariable wasn't created in the right device. Maybe there is a way to fix this bug. Here is the key code
Thank you your reply! I will try the sample demo later。I found the keras guidance that when using the model.fit()
method, dataset must use tf.keras.utils.experimental.DatasetCreator
, but this sample demo does not use this method.https://github.com/tensorflow/recommenders-addons/blob/master/demo/dynamic_embedding/movielens-1m-keras-ps/movielens-1m-keras-ps.py line 163
It seems that
user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, devices=self.devices, name='user_embedding')
has some bugs。The embedding could not cannot identify which port the variables are on.
- If I change the embedding demo,
user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, distribute_strategy=self.strategy, # devices=self.devices, name='user_embedding')
.New problem arised. If anyone give me some advices。I think the embdding varibles updates are out of sync
@sosixyz This could be caused by ShadowVariable wasn't created in the right device. Maybe there is a way to fix this bug. Here is the key code
Thank you your reply! I will try the sample demo later。I found the keras guidance that when using the
model.fit()
method, dataset must usetf.keras.utils.experimental.DatasetCreator
, but this sample demo does not use this method.https://github.com/tensorflow/recommenders-addons/blob/master/demo/dynamic_embedding/movielens-1m-keras-ps/movielens-1m-keras-ps.py line 163
Yes, you're right. Mostly using tf.keras.utils.experimental.DatasetCreator for dispatch input data to different worker. But this is a simple demo after all, so I got lazy. Could you please contribute a more complete demo if it's convenient for you.
@alykhantejani Don’t worry the memory, DE alltoall embedding layer will shard entire embedding into different worker rank. And also you can use cpu embedding table, but DE HKV backend would be the best solution which is able to use both gpu memory and host memory for embedding storage. Most situations, 2T host memory is enough.
What’s more, if someone has spare time, we’re so welcome to contribute fast-slow kv container which allows user, for example, evicting key in faster HKV into slower Redis for extending a larger storage. All basic api has been developed, and now, we’re waiting someone to write a Component to realize it.
@MoFHeka is there any example anywhere that does synchronous multi-worker large dymaic embeddings?
@alykhantejani Don’t worry the memory, DE alltoall embedding layer will shard entire embedding into different worker rank. And also you can use cpu embedding table, but DE HKV backend would be the best solution which is able to use both gpu memory and host memory for embedding storage. Most situations, 2T host memory is enough.
What’s more, if someone has spare time, we’re so welcome to contribute fast-slow kv container which allows user, for example, evicting key in faster HKV into slower Redis for extending a larger storage. All basic api has been developed, and now, we’re waiting someone to write a Component to realize it.@MoFHeka is there any example anywhere that does synchronous multi-worker large dymaic embeddings?
@alykhantejani
Meta: https://github.com/pytorch/torchrec Software-Hardware Co-design for Fast and Scalable Training of Deep Learning Recommendation Models
Tencent: TFRA here with custom backend
Meituan: https://www.nvidia.com/en-us/on-demand/session/gtcspring22-s41370/ https://tech.meituan.com/2022/03/24/tensorflow-gpu-training-optimization-practice-in-meituan-waimai-recommendation-scenarios.html
Nvidia: https://github.com/NVIDIA-Merlin/HugeCTR
Alibaba: https://github.com/DeepRec-AI/DeepRec
@MoFHeka I meant using TFRA specifically, especially using Host Memory not GPU mem (as GPU devices are expensive)
@alykhantejani
Here is the demo: https://github.com/tensorflow/recommenders-addons/tree/master/demo/dynamic_embedding/movielens-1m-keras-with-horovod
If you want to place the embedding in host memory, please set parameter devices=["CPU"] when you create embedding layer.
If you want to use both host memory and device memory for embedding, using HKV. Replace the code with HKV creator when you assign the hash table backend.
https://github.com/tensorflow/recommenders-addons/blob/master/docs/api_docs/tfra/dynamic_embedding/HkvHashTableCreator.md
Here is the explanation how sync distributed training works: #365
@MoFHeka Hello, I try to use tf.keras.utils.experimental.DatasetCreator , but new bug arised. I just change data input function, I use keras.embeding that demo is running well. The demo is
import os
import tensorflow as tf
import tensorflow_datasets as tfds
import sys
from absl import flags
from absl import app
from tensorflow_recommenders_addons import dynamic_embedding as de
import keras.models
try:
from tensorflow.keras.optimizers.legacy import Adam
except:
from tensorflow.keras.optimizers import Adam
# tf.compat.v1.disable_eager_execution()
flags = tf.compat.v1.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_string(
'ps_list', "localhost:2220, localhost:2221",
'ps_list: to be a comma seperated string, '
'like "localhost:2220, localhost:2220"')
flags.DEFINE_string(
'worker_list', "localhost:2231",
'worker_list: to be a comma seperated string, '
'like "localhost:2231, localhost:2232"')
flags.DEFINE_string('chief', "localhost:2230", 'chief: like "localhost:2230"')
flags.DEFINE_string('task_mode', "worker",
'runninig_mode: ps or worker or chief.')
flags.DEFINE_integer('task_id', 0, 'task_id: used for allocating samples.')
input_spec = {
'user_id': tf.TensorSpec(shape=[
None,
], dtype=tf.int64, name='user_id'),
'movie_id': tf.TensorSpec(shape=[
None,
], dtype=tf.int64, name='movie_id')
}
class DualChannelsDeepModel(tf.keras.Model):
def __init__(self,
devices=[],
strategy = None,
user_embedding_size=1,
movie_embedding_size=1,
embedding_initializer=None,
is_training=True):
if not is_training:
de.enable_inference_mode()
super(DualChannelsDeepModel, self).__init__()
self.user_embedding_size = user_embedding_size
self.movie_embedding_size = movie_embedding_size
self.devices = devices
self.strategy = strategy
if embedding_initializer is None:
embedding_initializer = tf.keras.initializers.Zeros()
self.user_embedding = de.keras.layers.SquashedEmbedding(
user_embedding_size,
initializer=embedding_initializer,
# distribute_strategy=self.strategy,
devices=self.devices,
name='user_embedding')
self.movie_embedding = de.keras.layers.SquashedEmbedding(
movie_embedding_size,
initializer=embedding_initializer,
# distribute_strategy=self.strategy,
devices=self.devices,
name='movie_embedding')
self.dnn1 = tf.keras.layers.Dense(
64,
activation='relu',
kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
self.dnn2 = tf.keras.layers.Dense(
16,
activation='relu',
kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
self.dnn3 = tf.keras.layers.Dense(
5,
activation='softmax',
kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
self.bias_net = tf.keras.layers.Dense(
5,
activation='softmax',
kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
@tf.function
def call(self, features):
user_id = tf.reshape(features['user_id'], (-1, 1))
movie_id = tf.reshape(features['movie_id'], (-1, 1))
user_latent = self.user_embedding(user_id)
movie_latent = self.movie_embedding(movie_id)
latent = tf.concat([user_latent, movie_latent], axis=1)
x = self.dnn1(latent)
x = self.dnn2(x)
x = self.dnn3(x)
bias = self.bias_net(latent)
x = 0.2 * x + 0.8 * bias
return x
class Runner():
def __init__(self, strategy, train_bs, test_bs, epochs, steps_per_epoch,
model_dir, export_dir):
self.strategy = strategy
self.num_worker = strategy._num_workers
self.num_ps = strategy._num_ps
# print('######################',self.num_ps)
self.ps_devices = [
"/job:ps/replica:0/task:{}/device:CPU:0".format(idx)
for idx in range(self.num_ps)
]
print('##############', self.ps_devices)
self.embedding_size = 32
self.train_bs = train_bs
self.test_bs = test_bs
self.epochs = epochs
self.steps_per_epoch = steps_per_epoch
self.model_dir = model_dir
self.export_dir = export_dir
def get_dataset(self, batch_size=1):
dataset = tfds.load('movielens/1m-ratings', split='train')
features = dataset.map(
lambda x: {
"movie_id": tf.strings.to_number(x["movie_id"], tf.int64),
"user_id": tf.strings.to_number(x["user_id"], tf.int64),
})
ratings = dataset.map(
lambda x: tf.one_hot(tf.cast(x['user_rating'] - 1, dtype=tf.int64), 5))
dataset = dataset.zip((features, ratings))
dataset = dataset.shuffle(4096, reshuffle_each_iteration=False)
if batch_size > 1:
dataset = dataset.batch(batch_size)
return dataset
def train(self):
# dataset = self.get_dataset(batch_size=self.train_bs)
# dataset = self.strategy.experimental_distribute_dataset(dataset)
with self.strategy.scope():
model = DualChannelsDeepModel(
self.ps_devices, self.strategy, self.embedding_size, self.embedding_size,
tf.keras.initializers.RandomNormal(0.0, 0.5))
optimizer = Adam(1E-3)
optimizer = de.DynamicEmbeddingOptimizer(optimizer)
auc = tf.keras.metrics.AUC(num_thresholds=1000)
model.compile(optimizer=optimizer,
loss=tf.keras.losses.MeanSquaredError(),
metrics=[
auc,
])
if self.model_dir:
if os.path.exists(self.model_dir):
model.load_weights(self.model_dir)
batch_size = self.train_bs
def dataset_fn(input_context):
dataset = tfds.load('movielens/1m-ratings', split='train')
dataset = dataset.shard(input_context.num_input_pipelines, input_context.input_pipeline_id)
features = dataset.map(
lambda x: {
"movie_id": tf.strings.to_number(x["movie_id"], tf.int64),
"user_id": tf.strings.to_number(x["user_id"], tf.int64),
})
ratings = dataset.map(
lambda x: tf.one_hot(tf.cast(x['user_rating'] - 1, dtype=tf.int64), 5))
dataset = dataset.zip((features, ratings))
dataset = dataset.shuffle(4096, reshuffle_each_iteration=False)
if batch_size > 1:
dataset = dataset.batch(batch_size)
return dataset
input_options = tf.distribute.InputOptions(experimental_fetch_to_device=True)
distributed_dataset = tf.keras.utils.experimental.DatasetCreator(dataset_fn, input_options=input_options)
model.fit(distributed_dataset, epochs=self.epochs, steps_per_epoch=self.steps_per_epoch)
if self.model_dir:
save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA'])
model.save(self.model_dir, options=save_options)
def export(self):
with self.strategy.scope():
model = DualChannelsDeepModel(
self.ps_devices, self.embedding_size, self.embedding_size,
tf.keras.initializers.RandomNormal(0.0, 0.5))
def save_spec():
if hasattr(model, 'save_spec'):
return model.save_spec()
else:
arg_specs = list()
kwarg_specs = dict()
for i in model.inputs:
arg_specs.append(i.type_spec)
return [arg_specs], kwarg_specs
@tf.function
def serve(*args, **kwargs):
return model(*args, **kwargs)
save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA'])
# Only save the calculation graph
from tensorflow.python.saved_model import save as tf_save
K.clear_session()
de.enable_inference_mode()
# Overwrite saved_model.pb file with save_and_return_nodes function to rewrite the calculation graph
tf_save.save_and_return_nodes(obj=model,
export_dir=self.export_dir,
signatures={
'serving_default':
serve.get_concrete_function(
*arg_specs, **kwarg_specs)
},
options=save_options,
experimental_skip_checkpoint=True)
def test(self):
de.enable_inference_mode()
dataset = self.get_dataset(batch_size=self.test_bs)
dataset = self.strategy.experimental_distribute_dataset(dataset)
with self.strategy.scope():
model = tf.keras.models.load_model(self.export_dir)
signature = model.signatures['serving_default']
def get_close_or_equal_cnt(model, features, ratings):
preds = model(features)
preds = tf.math.argmax(preds, axis=1)
ratings = tf.math.argmax(ratings, axis=1)
close_cnt = tf.reduce_sum(
tf.cast(tf.math.abs(preds - ratings) <= 1, dtype=tf.int32))
equal_cnt = tf.reduce_sum(
tf.cast(tf.math.abs(preds - ratings) == 0, dtype=tf.int32))
return close_cnt, equal_cnt
it = iter(dataset)
for step in range(self.test_steps):
features, ratings = it.get_next()
close_cnt, equal_cnt = get_close_or_equal_cnt(model, features, ratings)
print(
f'In batch prediction, step: {step}, {close_cnt}/{self.test_bs} are closely'
f' accurate, {equal_cnt}/{self.test_bs} are absolutely accurate.')
def start_chief(config):
print("chief config", config)
cluster_spec = tf.train.ClusterSpec(config["cluster"])
cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
cluster_spec, task_type="chief", task_id=0)
# cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
# cluster_spec, task_type="chief", task_id=0)
strategy = tf.distribute.experimental.ParameterServerStrategy(
cluster_resolver)
runner = Runner(strategy=strategy,
train_bs=32,
test_bs=1,
epochs=2,
steps_per_epoch=10,
model_dir=None,
export_dir=None)
runner.train()
def start_worker(task_id, config):
print("worker config", config)
cluster_spec = tf.train.ClusterSpec(config["cluster"])
sess_config = tf.compat.v1.ConfigProto()
sess_config.intra_op_parallelism_threads = 4
sess_config.inter_op_parallelism_threads = 4
server = tf.distribute.Server(cluster_spec,
config=sess_config,
protocol='grpc',
job_name="worker",
task_index=task_id)
server.join()
def start_ps(task_id, config):
print("ps config", config)
cluster_spec = tf.train.ClusterSpec(config["cluster"])
sess_config = tf.compat.v1.ConfigProto()
sess_config.intra_op_parallelism_threads = 4
sess_config.inter_op_parallelism_threads = 4
server = tf.distribute.Server(cluster_spec,
config=sess_config,
protocol='grpc',
job_name="ps",
task_index=task_id)
server.join()
def main(argv):
ps_list = FLAGS.ps_list.replace(' ', '').split(',')
worker_list = FLAGS.worker_list.replace(' ', '').split(',')
task_mode = FLAGS.task_mode
task_id = FLAGS.task_id
print('ps_list: ', ps_list)
print('worker_list: ', worker_list)
cluster_config = {
"cluster": {
"chief": [FLAGS.chief],
"ps": ps_list,
"worker": worker_list
}
}
print(cluster_config)
if task_mode == 'ps':
start_ps(task_id, cluster_config)
elif task_mode == 'worker':
start_worker(task_id, cluster_config)
elif task_mode == 'chief':
start_chief(cluster_config)
else:
print('invalid task_mode. Options include "ps" and "worker".')
sys.exit(1)
if __name__ == "__main__":
tf.compat.v1.app.run()