[DLRM]dst pointer is not actually on GPU
kangna-qi opened this issue · 1 comments
kangna-qi commented
System information
- OS Platform and Distribution (e.g., Linux Ubuntu 20.04):20.04
- DeepRec version or commit id:
- Python version:3.8
- Bazel version (if compiling from source):0.26.1
- GCC/Compiler version (if compiling from source):
- CUDA/cuDNN version:11.6
Describe the current behavior
I train dlrm with horovod and sok, but I get this error:
Describe the expected behavior
Code to reproduce the issue
datasets:Criteo-1T
I modified the dlrm script to support distributed trainging.code show as below
cd modelzoo/dlrm
horovodrun -np 8 python train.py --batch_size 8192 --learning_rate 0.01 --group_embedding 'collective' --data_location /data/Criteo --parquet_dataset false --ev true --input_layer_partitioner 0
diff --git a/modelzoo/dlrm/train.py b/modelzoo/dlrm/train.py
index eb2d110982..427c75aa1f 100644
--- a/modelzoo/dlrm/train.py
+++ b/modelzoo/dlrm/train.py
@@ -24,6 +24,7 @@ from tensorflow.python.client import timeline
import json
from tensorflow.python.ops import partitioned_variables
+import horovod.tensorflow as hvd
# Set to INFO for tracking training, default is WARN. ERROR for least messages
tf.logging.set_verbosity(tf.logging.INFO)
@@ -294,7 +295,7 @@ def build_model_input(filename, batch_size, num_epochs):
label_defaults = [[0]]
column_headers = TRAIN_DATA_COLUMNS
record_defaults = label_defaults + cont_defaults + cate_defaults
- columns = tf.io.decode_csv(value, record_defaults=record_defaults)
+ columns = tf.io.decode_csv(value, record_defaults=record_defaults, field_delim='\t')
all_columns = collections.OrderedDict(zip(column_headers, columns))
labels = all_columns.pop(LABEL_COLUMN[0])
features = all_columns
@@ -314,7 +315,13 @@ def build_model_input(filename, batch_size, num_epochs):
# work_queue = WorkQueue([filename, filename1,filename2,filename3])
files = work_queue.input_dataset()
else:
- files = filename
+ files = []
+ if filename.split('/')[-1] == 'train':
+ for i in range(23):
+ files.append(filename + "/day_"+str(i))
+ else:
+ files = filename
+ print(files)
# Extract lines from input files using the Dataset API.
if args.parquet_dataset and not args.tf:
@@ -327,6 +334,8 @@ def build_model_input(filename, batch_size, num_epochs):
dataset = dataset.map(parse_parquet, num_parallel_calls=28)
else:
dataset = tf.data.TextLineDataset(files)
+ if filename.split('/')[-1] == 'train':
+ dataset = dataset.shard(hvd.size(), hvd.rank())
dataset = dataset.shuffle(buffer_size=20000,
seed=args.seed) # fix seed for reproducing
dataset = dataset.repeat(num_epochs)
@@ -585,8 +594,10 @@ def main(tf_config=None, server=None):
train_file += '/train.parquet'
test_file += '/eval.parquet'
else:
- train_file += '/train.csv'
- test_file += '/eval.csv'
+ train_file += '/train'
+ test_file += '/test/day_23'
+ print(train_file)
+ print(test_file)
if (not os.path.exists(train_file)) or (not os.path.exists(test_file)):
print("Dataset does not exist in the given data_location.")
sys.exit()
@@ -597,8 +608,10 @@ def main(tf_config=None, server=None):
no_of_training_examples = pq.read_table(train_file).num_rows
no_of_test_examples = pq.read_table(test_file).num_rows
else:
- no_of_training_examples = sum(1 for line in open(train_file))
- no_of_test_examples = sum(1 for line in open(test_file))
+ #no_of_training_examples = sum(1 for line in open(train_file))
+ #no_of_test_examples = sum(1 for line in open(test_file))
+ no_of_training_examples = 4195197692
+ no_of_test_examples = 178274637
print("Numbers of training dataset is {}".format(no_of_training_examples))
print("Numbers of test dataset is {}".format(no_of_test_examples))
@@ -610,7 +623,7 @@ def main(tf_config=None, server=None):
if args.steps == 0:
no_of_epochs = 1
train_steps = math.ceil(
- (float(no_of_epochs) * no_of_training_examples) / batch_size)
+ (float(no_of_epochs) * no_of_training_examples) / batch_size / hvd.size())
else:
no_of_epochs = math.ceil(
(float(batch_size) * args.steps) / no_of_training_examples)
@@ -657,6 +670,7 @@ def main(tf_config=None, server=None):
# Session config
sess_config = tf.ConfigProto()
+ sess_config.gpu_options.visible_device_list = str(hvd.local_rank())
if tf_config:
sess_config.device_filters.append("/job:ps")
sess_config.inter_op_parallelism_threads = args.inter
Provide a reproducible test case that is the bare minimum necessary to generate the problem.
Other info / logs
Include any logs or source code that would be helpful to diagnose the problem. If including tracebacks, please include the full traceback. Large logs and files should be attached.
JackMoriarty commented