tensorflow/recommenders-addons

The sample demo have some bug.

Opened this issue · 18 comments

tensorflow: 2.8.0
tfra: 0.6.0

  • I download my demo, the path is demo/dynamic_embedding/movielens-1m-keras-ps/movielens-1m-keras-ps.py。
    Acoording to the file, I sh start.sh ,The bug is show 。
    image

Hi @MoFHeka could you please help with it?

It seems that user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, devices=self.devices, name='user_embedding') has some bugs。The embedding could not cannot identify which port the variables are on.

  • If I change the embedding demo,user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, distribute_strategy=self.strategy, # devices=self.devices, name='user_embedding').New problem arised. If anyone give me some advices。I think the embdding varibles updates are out of sync
    image

Any update on this? I am also facing this issue

@sosixyz Could you please provide a minimum reproducible code?

@MoFHeka It's weird, I can't produce a MWE as it only occurs in some settings for me. According to @rhdong here: #414 it is an issue that dev team are aware of? (although I don't really know what the issue is)

@alykhantejani Most of TFRA users are using GPU sync training without PS. So it's few people to aware this issue.
If this issue occurs only some of the time, it may be due to a faulty device setting, and we may be able to fix it. Here is the key code which may cause error

.

It seems that user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, devices=self.devices, name='user_embedding') has some bugs。The embedding could not cannot identify which port the variables are on.

  • If I change the embedding demo,user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, distribute_strategy=self.strategy, # devices=self.devices, name='user_embedding').New problem arised. If anyone give me some advices。I think the embdding varibles updates are out of sync
    image

@sosixyz This could be caused by ShadowVariable wasn't created in the right device. Maybe there is a way to fix this bug. Here is the key code

with context.device_policy(context.DEVICE_PLACEMENT_SILENT):

@alykhantejani Most of TFRA users are using GPU sync training without PS. So it's few people to aware this issue. If this issue occurs only some of the time, it may be due to a faulty device setting, and we may be able to fix it. Here is the key code which may cause error

.

Thanks for the response, I'll try and take a closer look here. If using sync training with GPUs, you would need GPUs with large on-device memory correct? I thought for this reason PS strategy would be more common

I should not Im using an in-mem cluster for testing with 2 PS and passing these device names to BasicEmbedding

@alykhantejani Most of TFRA users are using GPU sync training without PS. So it's few people to aware this issue. If this issue occurs only some of the time, it may be due to a faulty device setting, and we may be able to fix it. Here is the key code which may cause error

.

Thanks for the response, I'll try and take a closer look here. If using sync training with GPUs, you would need GPUs with large on-device memory correct? I thought for this reason PS strategy would be more common

@alykhantejani Don’t worry the memory, DE alltoall embedding layer will shard entire embedding into different worker rank. And also you can use cpu embedding table, but DE HKV backend would be the best solution which is able to use both gpu memory and host memory for embedding storage. Most situations, 2T host memory is enough.
What’s more, if someone has spare time, we’re so welcome to contribute fast-slow kv container which allows user, for example, evicting key in faster HKV into slower Redis for extending a larger storage. All basic api has been developed, and now, we’re waiting someone to write a Component to realize it.

@sosixyz Could you please provide a minimum reproducible code?

Thank you for your reply!The reproducible code is copied from the link https://github.com/tensorflow/recommenders-addons/blob/master/demo/dynamic_embedding/movielens-1m-keras-ps/movielens-1m-keras-ps.py.

It seems that user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, devices=self.devices, name='user_embedding') has some bugs。The embedding could not cannot identify which port the variables are on.

  • If I change the embedding demo,user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, distribute_strategy=self.strategy, # devices=self.devices, name='user_embedding').New problem arised. If anyone give me some advices。I think the embdding varibles updates are out of sync
    image

@sosixyz This could be caused by ShadowVariable wasn't created in the right device. Maybe there is a way to fix this bug. Here is the key code

with context.device_policy(context.DEVICE_PLACEMENT_SILENT):

Thank you your reply! I will try the sample demo later。I found the keras guidance that when using the model.fit() method, dataset must use tf.keras.utils.experimental.DatasetCreator, but this sample demo does not use this method.https://github.com/tensorflow/recommenders-addons/blob/master/demo/dynamic_embedding/movielens-1m-keras-ps/movielens-1m-keras-ps.py line 163

It seems that user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, devices=self.devices, name='user_embedding') has some bugs。The embedding could not cannot identify which port the variables are on.

  • If I change the embedding demo,user_embedding = de.keras.layers.SquashedEmbedding( user_embedding_size, initializer=embedding_initializer, distribute_strategy=self.strategy, # devices=self.devices, name='user_embedding').New problem arised. If anyone give me some advices。I think the embdding varibles updates are out of sync
    image

@sosixyz This could be caused by ShadowVariable wasn't created in the right device. Maybe there is a way to fix this bug. Here is the key code

with context.device_policy(context.DEVICE_PLACEMENT_SILENT):

Thank you your reply! I will try the sample demo later。I found the keras guidance that when using the model.fit() method, dataset must use tf.keras.utils.experimental.DatasetCreator, but this sample demo does not use this method.https://github.com/tensorflow/recommenders-addons/blob/master/demo/dynamic_embedding/movielens-1m-keras-ps/movielens-1m-keras-ps.py line 163

Yes, you're right. Mostly using tf.keras.utils.experimental.DatasetCreator for dispatch input data to different worker. But this is a simple demo after all, so I got lazy. Could you please contribute a more complete demo if it's convenient for you.

@alykhantejani Don’t worry the memory, DE alltoall embedding layer will shard entire embedding into different worker rank. And also you can use cpu embedding table, but DE HKV backend would be the best solution which is able to use both gpu memory and host memory for embedding storage. Most situations, 2T host memory is enough.
What’s more, if someone has spare time, we’re so welcome to contribute fast-slow kv container which allows user, for example, evicting key in faster HKV into slower Redis for extending a larger storage. All basic api has been developed, and now, we’re waiting someone to write a Component to realize it.

@MoFHeka is there any example anywhere that does synchronous multi-worker large dymaic embeddings?

@alykhantejani Don’t worry the memory, DE alltoall embedding layer will shard entire embedding into different worker rank. And also you can use cpu embedding table, but DE HKV backend would be the best solution which is able to use both gpu memory and host memory for embedding storage. Most situations, 2T host memory is enough.
What’s more, if someone has spare time, we’re so welcome to contribute fast-slow kv container which allows user, for example, evicting key in faster HKV into slower Redis for extending a larger storage. All basic api has been developed, and now, we’re waiting someone to write a Component to realize it.

@MoFHeka is there any example anywhere that does synchronous multi-worker large dymaic embeddings?

@alykhantejani
Meta: https://github.com/pytorch/torchrec Software-Hardware Co-design for Fast and Scalable Training of Deep Learning Recommendation Models
Tencent: TFRA here with custom backend
Meituan: https://www.nvidia.com/en-us/on-demand/session/gtcspring22-s41370/ https://tech.meituan.com/2022/03/24/tensorflow-gpu-training-optimization-practice-in-meituan-waimai-recommendation-scenarios.html
Nvidia: https://github.com/NVIDIA-Merlin/HugeCTR
Alibaba: https://github.com/DeepRec-AI/DeepRec

@MoFHeka I meant using TFRA specifically, especially using Host Memory not GPU mem (as GPU devices are expensive)

@alykhantejani
Here is the demo: https://github.com/tensorflow/recommenders-addons/tree/master/demo/dynamic_embedding/movielens-1m-keras-with-horovod

If you want to place the embedding in host memory, please set parameter devices=["CPU"] when you create embedding layer.

If you want to use both host memory and device memory for embedding, using HKV. Replace the code with HKV creator when you assign the hash table backend.
https://github.com/tensorflow/recommenders-addons/blob/master/docs/api_docs/tfra/dynamic_embedding/HkvHashTableCreator.md

Here is the explanation how sync distributed training works: #365

@MoFHeka Hello, I try to use tf.keras.utils.experimental.DatasetCreator , but new bug arised. I just change data input function, I use keras.embeding that demo is running well. The demo is

import os
import tensorflow as tf
import tensorflow_datasets as tfds
import sys

from absl import flags
from absl import app
from tensorflow_recommenders_addons import dynamic_embedding as de
import keras.models 
try:
  from tensorflow.keras.optimizers.legacy import Adam
except:
  from tensorflow.keras.optimizers import Adam
# tf.compat.v1.disable_eager_execution()
flags = tf.compat.v1.app.flags
FLAGS = flags.FLAGS
flags.DEFINE_string(
    'ps_list', "localhost:2220, localhost:2221",
    'ps_list: to be a comma seperated string, '
    'like "localhost:2220, localhost:2220"')
flags.DEFINE_string(
    'worker_list', "localhost:2231",
    'worker_list: to be a comma seperated string, '
    'like "localhost:2231, localhost:2232"')
flags.DEFINE_string('chief', "localhost:2230", 'chief: like "localhost:2230"')
flags.DEFINE_string('task_mode', "worker",
                    'runninig_mode: ps or worker or chief.')
flags.DEFINE_integer('task_id', 0, 'task_id: used for allocating samples.')

input_spec = {
    'user_id': tf.TensorSpec(shape=[
        None,
    ], dtype=tf.int64, name='user_id'),
    'movie_id': tf.TensorSpec(shape=[
        None,
    ], dtype=tf.int64, name='movie_id')
}


class DualChannelsDeepModel(tf.keras.Model):

  def __init__(self,
               devices=[],
               strategy = None,
               user_embedding_size=1,
               movie_embedding_size=1,
               embedding_initializer=None,
               is_training=True):

    if not is_training:
      de.enable_inference_mode()

    super(DualChannelsDeepModel, self).__init__()
    self.user_embedding_size = user_embedding_size
    self.movie_embedding_size = movie_embedding_size
    self.devices = devices
    self.strategy = strategy

    if embedding_initializer is None:
      embedding_initializer = tf.keras.initializers.Zeros()

    self.user_embedding = de.keras.layers.SquashedEmbedding(
        user_embedding_size,
        initializer=embedding_initializer,
        # distribute_strategy=self.strategy,
         devices=self.devices,
        name='user_embedding')
           
    self.movie_embedding = de.keras.layers.SquashedEmbedding(
        movie_embedding_size,
        initializer=embedding_initializer,
        # distribute_strategy=self.strategy,
        devices=self.devices,
        name='movie_embedding')

    self.dnn1 = tf.keras.layers.Dense(
        64,
        activation='relu',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
    self.dnn2 = tf.keras.layers.Dense(
        16,
        activation='relu',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
    self.dnn3 = tf.keras.layers.Dense(
        5,
        activation='softmax',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))
    self.bias_net = tf.keras.layers.Dense(
        5,
        activation='softmax',
        kernel_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1),
        bias_initializer=tf.keras.initializers.RandomNormal(0.0, 0.1))

  @tf.function
  def call(self, features):
    user_id = tf.reshape(features['user_id'], (-1, 1))
    movie_id = tf.reshape(features['movie_id'], (-1, 1))
    user_latent = self.user_embedding(user_id)
    movie_latent = self.movie_embedding(movie_id)
    latent = tf.concat([user_latent, movie_latent], axis=1)

    x = self.dnn1(latent)
    x = self.dnn2(x)
    x = self.dnn3(x)

    bias = self.bias_net(latent)
    x = 0.2 * x + 0.8 * bias
    return x


class Runner():

  def __init__(self, strategy, train_bs, test_bs, epochs, steps_per_epoch,
               model_dir, export_dir):
    self.strategy = strategy
    self.num_worker = strategy._num_workers
    self.num_ps = strategy._num_ps
    # print('######################',self.num_ps)
    self.ps_devices = [
        "/job:ps/replica:0/task:{}/device:CPU:0".format(idx)
        for idx in range(self.num_ps)
    ]
    print('##############', self.ps_devices)
    self.embedding_size = 32
    self.train_bs = train_bs
    self.test_bs = test_bs
    self.epochs = epochs
    self.steps_per_epoch = steps_per_epoch
    self.model_dir = model_dir
    self.export_dir = export_dir

  def get_dataset(self, batch_size=1):
    dataset = tfds.load('movielens/1m-ratings', split='train')
    features = dataset.map(
        lambda x: {
            "movie_id": tf.strings.to_number(x["movie_id"], tf.int64),
            "user_id": tf.strings.to_number(x["user_id"], tf.int64),
        })
    ratings = dataset.map(
        lambda x: tf.one_hot(tf.cast(x['user_rating'] - 1, dtype=tf.int64), 5))
    dataset = dataset.zip((features, ratings))
    dataset = dataset.shuffle(4096, reshuffle_each_iteration=False)
    if batch_size > 1:
      dataset = dataset.batch(batch_size)
    return dataset

  def train(self):
    # dataset = self.get_dataset(batch_size=self.train_bs)
    # dataset = self.strategy.experimental_distribute_dataset(dataset)
    with self.strategy.scope():
      model = DualChannelsDeepModel(
          self.ps_devices, self.strategy, self.embedding_size, self.embedding_size,
          tf.keras.initializers.RandomNormal(0.0, 0.5))
      optimizer = Adam(1E-3)
      optimizer = de.DynamicEmbeddingOptimizer(optimizer)

      auc = tf.keras.metrics.AUC(num_thresholds=1000)

      model.compile(optimizer=optimizer,
                    loss=tf.keras.losses.MeanSquaredError(),
                    metrics=[
                        auc,
                    ])
      
    if self.model_dir:
        if os.path.exists(self.model_dir):
          model.load_weights(self.model_dir)
    batch_size = self.train_bs
    def dataset_fn(input_context):
      dataset = tfds.load('movielens/1m-ratings', split='train')
      dataset = dataset.shard(input_context.num_input_pipelines, input_context.input_pipeline_id)
      features = dataset.map(
          lambda x: {
              "movie_id": tf.strings.to_number(x["movie_id"], tf.int64),
              "user_id": tf.strings.to_number(x["user_id"], tf.int64),
          })
      ratings = dataset.map(
          lambda x: tf.one_hot(tf.cast(x['user_rating'] - 1, dtype=tf.int64), 5))
      dataset = dataset.zip((features, ratings))
      dataset = dataset.shuffle(4096, reshuffle_each_iteration=False)
      if batch_size > 1:
        dataset = dataset.batch(batch_size)
      return dataset
    input_options = tf.distribute.InputOptions(experimental_fetch_to_device=True)
    distributed_dataset = tf.keras.utils.experimental.DatasetCreator(dataset_fn, input_options=input_options)
    
    model.fit(distributed_dataset, epochs=self.epochs, steps_per_epoch=self.steps_per_epoch)

    if self.model_dir:
        save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA'])
        model.save(self.model_dir, options=save_options)

  def export(self):
    with self.strategy.scope():
      model = DualChannelsDeepModel(
          self.ps_devices, self.embedding_size, self.embedding_size,
          tf.keras.initializers.RandomNormal(0.0, 0.5))

    def save_spec():
      if hasattr(model, 'save_spec'):
        return model.save_spec()
      else:
        arg_specs = list()
        kwarg_specs = dict()
        for i in model.inputs:
          arg_specs.append(i.type_spec)
        return [arg_specs], kwarg_specs

    @tf.function
    def serve(*args, **kwargs):
      return model(*args, **kwargs)

    save_options = tf.saved_model.SaveOptions(namespace_whitelist=['TFRA'])

    # Only save the calculation graph
    from tensorflow.python.saved_model import save as tf_save
    K.clear_session()
    de.enable_inference_mode()
    # Overwrite saved_model.pb file with save_and_return_nodes function to rewrite the calculation graph
    tf_save.save_and_return_nodes(obj=model,
                                  export_dir=self.export_dir,
                                  signatures={
                                      'serving_default':
                                          serve.get_concrete_function(
                                              *arg_specs, **kwarg_specs)
                                  },
                                  options=save_options,
                                  experimental_skip_checkpoint=True)

  def test(self):
    de.enable_inference_mode()

    dataset = self.get_dataset(batch_size=self.test_bs)
    dataset = self.strategy.experimental_distribute_dataset(dataset)
    with self.strategy.scope():
      model = tf.keras.models.load_model(self.export_dir)
    signature = model.signatures['serving_default']

    def get_close_or_equal_cnt(model, features, ratings):
      preds = model(features)
      preds = tf.math.argmax(preds, axis=1)
      ratings = tf.math.argmax(ratings, axis=1)
      close_cnt = tf.reduce_sum(
          tf.cast(tf.math.abs(preds - ratings) <= 1, dtype=tf.int32))
      equal_cnt = tf.reduce_sum(
          tf.cast(tf.math.abs(preds - ratings) == 0, dtype=tf.int32))
      return close_cnt, equal_cnt

    it = iter(dataset)
    for step in range(self.test_steps):
      features, ratings = it.get_next()
      close_cnt, equal_cnt = get_close_or_equal_cnt(model, features, ratings)
      print(
          f'In batch prediction, step: {step}, {close_cnt}/{self.test_bs} are closely'
          f' accurate, {equal_cnt}/{self.test_bs} are absolutely accurate.')


def start_chief(config):
  print("chief config", config)

  cluster_spec = tf.train.ClusterSpec(config["cluster"])
  cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      cluster_spec, task_type="chief", task_id=0)
  # cluster_resolver = tf.distribute.cluster_resolver.SimpleClusterResolver(
      # cluster_spec, task_type="chief", task_id=0)
  strategy = tf.distribute.experimental.ParameterServerStrategy(
      cluster_resolver)
  runner = Runner(strategy=strategy,
                  train_bs=32,
                  test_bs=1,
                  epochs=2,
                  steps_per_epoch=10,
                  model_dir=None,
                  export_dir=None)
  runner.train()


def start_worker(task_id, config):
  print("worker config", config)
  cluster_spec = tf.train.ClusterSpec(config["cluster"])

  sess_config = tf.compat.v1.ConfigProto()
  sess_config.intra_op_parallelism_threads = 4
  sess_config.inter_op_parallelism_threads = 4
  server = tf.distribute.Server(cluster_spec,
                                config=sess_config,
                                protocol='grpc',
                                job_name="worker",
                                task_index=task_id)
  server.join()


def start_ps(task_id, config):
  print("ps config", config)
  cluster_spec = tf.train.ClusterSpec(config["cluster"])

  sess_config = tf.compat.v1.ConfigProto()
  sess_config.intra_op_parallelism_threads = 4
  sess_config.inter_op_parallelism_threads = 4
  server = tf.distribute.Server(cluster_spec,
                                config=sess_config,
                                protocol='grpc',
                                job_name="ps",
                                task_index=task_id)
  server.join()


def main(argv):
  ps_list = FLAGS.ps_list.replace(' ', '').split(',')
  worker_list = FLAGS.worker_list.replace(' ', '').split(',')
  task_mode = FLAGS.task_mode
  task_id = FLAGS.task_id

  print('ps_list: ', ps_list)
  print('worker_list: ', worker_list)

  cluster_config = {
      "cluster": {
          "chief": [FLAGS.chief],
          "ps": ps_list,
          "worker": worker_list
      }
  }
  print(cluster_config)
  if task_mode == 'ps':
    start_ps(task_id, cluster_config)
  elif task_mode == 'worker':
    start_worker(task_id, cluster_config)
  elif task_mode == 'chief':
    start_chief(cluster_config)
  else:
    print('invalid task_mode. Options include "ps" and "worker".')
    sys.exit(1)


if __name__ == "__main__":
  tf.compat.v1.app.run()

the bug is
image