DeepRec-AI/DeepRec

[DLRM]dst pointer is not actually on GPU

kangna-qi opened this issue · 1 comments

System information

  • OS Platform and Distribution (e.g., Linux Ubuntu 20.04):20.04
  • DeepRec version or commit id:
  • Python version:3.8
  • Bazel version (if compiling from source):0.26.1
  • GCC/Compiler version (if compiling from source):
  • CUDA/cuDNN version:11.6

Describe the current behavior
I train dlrm with horovod and sok, but I get this error:
log
Describe the expected behavior
Code to reproduce the issue
datasets:Criteo-1T
I modified the dlrm script to support distributed trainging.code show as below

cd modelzoo/dlrm
horovodrun -np 8 python train.py --batch_size 8192  --learning_rate 0.01 --group_embedding 'collective' --data_location /data/Criteo --parquet_dataset false --ev true --input_layer_partitioner 0
diff --git a/modelzoo/dlrm/train.py b/modelzoo/dlrm/train.py
index eb2d110982..427c75aa1f 100644
--- a/modelzoo/dlrm/train.py
+++ b/modelzoo/dlrm/train.py
@@ -24,6 +24,7 @@ from tensorflow.python.client import timeline
 import json
 
 from tensorflow.python.ops import partitioned_variables
+import horovod.tensorflow as hvd
 
 # Set to INFO for tracking training, default is WARN. ERROR for least messages
 tf.logging.set_verbosity(tf.logging.INFO)
@@ -294,7 +295,7 @@ def build_model_input(filename, batch_size, num_epochs):
         label_defaults = [[0]]
         column_headers = TRAIN_DATA_COLUMNS
         record_defaults = label_defaults + cont_defaults + cate_defaults
-        columns = tf.io.decode_csv(value, record_defaults=record_defaults)
+        columns = tf.io.decode_csv(value, record_defaults=record_defaults, field_delim='\t')
         all_columns = collections.OrderedDict(zip(column_headers, columns))
         labels = all_columns.pop(LABEL_COLUMN[0])
         features = all_columns
@@ -314,7 +315,13 @@ def build_model_input(filename, batch_size, num_epochs):
         # work_queue = WorkQueue([filename, filename1,filename2,filename3])
         files = work_queue.input_dataset()
     else:
-        files = filename
+        files = []
+        if filename.split('/')[-1] == 'train':
+          for i in range(23):
+            files.append(filename + "/day_"+str(i))
+        else:
+          files = filename
+        print(files)
 
     # Extract lines from input files using the Dataset API.
     if args.parquet_dataset and not args.tf:
@@ -327,6 +334,8 @@ def build_model_input(filename, batch_size, num_epochs):
         dataset = dataset.map(parse_parquet, num_parallel_calls=28)
     else:
         dataset = tf.data.TextLineDataset(files)
+        if filename.split('/')[-1] == 'train':
+          dataset = dataset.shard(hvd.size(), hvd.rank())
         dataset = dataset.shuffle(buffer_size=20000,
                                   seed=args.seed)  # fix seed for reproducing
         dataset = dataset.repeat(num_epochs)
@@ -585,8 +594,10 @@ def main(tf_config=None, server=None):
         train_file += '/train.parquet'
         test_file += '/eval.parquet'
     else:
-        train_file += '/train.csv'
-        test_file += '/eval.csv'
+        train_file += '/train'
+        test_file += '/test/day_23'
+    print(train_file)
+    print(test_file)
     if (not os.path.exists(train_file)) or (not os.path.exists(test_file)):
         print("Dataset does not exist in the given data_location.")
         sys.exit()
@@ -597,8 +608,10 @@ def main(tf_config=None, server=None):
         no_of_training_examples = pq.read_table(train_file).num_rows
         no_of_test_examples = pq.read_table(test_file).num_rows
     else:
-        no_of_training_examples = sum(1 for line in open(train_file))
-        no_of_test_examples = sum(1 for line in open(test_file))
+        #no_of_training_examples = sum(1 for line in open(train_file))
+        #no_of_test_examples = sum(1 for line in open(test_file))
+        no_of_training_examples = 4195197692
+        no_of_test_examples = 178274637
     print("Numbers of training dataset is {}".format(no_of_training_examples))
     print("Numbers of test dataset is {}".format(no_of_test_examples))
 
@@ -610,7 +623,7 @@ def main(tf_config=None, server=None):
     if args.steps == 0:
         no_of_epochs = 1
         train_steps = math.ceil(
-            (float(no_of_epochs) * no_of_training_examples) / batch_size)
+            (float(no_of_epochs) * no_of_training_examples) / batch_size / hvd.size())
     else:
         no_of_epochs = math.ceil(
             (float(batch_size) * args.steps) / no_of_training_examples)
@@ -657,6 +670,7 @@ def main(tf_config=None, server=None):
 
     # Session config
     sess_config = tf.ConfigProto()
+    sess_config.gpu_options.visible_device_list = str(hvd.local_rank())
     if tf_config:
         sess_config.device_filters.append("/job:ps")
     sess_config.inter_op_parallelism_threads = args.inter

Provide a reproducible test case that is the bare minimum necessary to generate the problem.

Other info / logs
Include any logs or source code that would be helpful to diagnose the problem. If including tracebacks, please include the full traceback. Large logs and files should be attached.

Thanks for submitting an issue. This bug is fixed by PR #814 and PR #832