Why is the val_loss curve trained through Dali data loading method oscillating?
wangdada-love opened this issue · 2 comments
Describe the question.
Question description
I referred to official documents and examples to write a class for loading data through Dali, and used this class to load data for my model training. But during the training process, I found that the inference loss and inference IOU of the training were both oscillatory, and this oscillation was periodic. And,I have found through multiple experiments that the oscillation period matches the number of GPUs I have used.
I suspect this is related to the data distribution method when using distributed strategies, but I cannot pinpoint where the specific problem lies.
The following is the code for my data loading class. Could you please help me check if there are any issues and how should I resolve them?
code
dataload class
import os
from PIL import Image
import numpy as np
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.tfrecord as tfrec
import nvidia.dali.types as types
import nvidia.dali.plugin.tf as dali_tf
from nvidia.dali.auto_aug.auto_augment import auto_augment, apply_auto_augment
from nvidia.dali.auto_aug import augmentations
from nvidia.dali.auto_aug.core import Policy
import tensorflow as tf
import sys
sys.path.append("pipeline")
from data_augs import rand_bright, rand_gauss, rand_flip, rand_chroma, rand_zoom, rand_contrast, rand_crop, rand_rotate, gridmask, rand_sharp, rand_equalize
def create_pascal_label_colormap():
"""
create label colormap with PASCAL VOC segmentation dataset definition
# Returns
colormap: Colormap array for visualizing segmentation
"""
colormap = np.zeros((256, 3), dtype=int)
index = np.arange(256, dtype=int)
for shift in reversed(range(8)):
for channel in range(3):
colormap[:, channel] |= ((index >> channel) & 1) << shift
index >>= 3
# print(colormap)
return colormap
def label_to_color_image(label):
"""
mapping the segmentation label to color indexing array
# Arguments
label: 2D uint8 numpy array, with segmentation label
# Returns
result: A 2D array with floating type. The element of the array
is the color indexed by the corresponding element in the input label
to the PascalVOC color map.
Raises:
ValueError: If label is not of rank 2 or its value is larger than color
map maximum entry.
"""
if label.ndim != 2:
raise ValueError('Expect 2-D input label')
colormap = create_pascal_label_colormap()
if np.max(label) >= len(colormap):
raise ValueError('label value too large.')
return colormap[label]
class SegPipeline:
def __init__(self, record_files, idx_files, image_size=[512, 512], batch_size=32, num_devices=4, num_class=7, augment=False, strategy=None):
self.record_files = record_files
self.idx_files = idx_files
self.augment = augment
self.strategy = strategy
# self.image_size = image_size
self.batch_size = batch_size
self.total_batch_size = batch_size * num_devices
self.shapes = (
(batch_size, image_size[0], image_size[1], 3), (batch_size, image_size[0]*image_size[1], num_class) # num_class
)
self.num_devices = num_devices
self.num_class = num_class
self.dtypes = (tf.float32, tf.int32)
# print(f'shapes:{self.shapes}')
# print(f'shapes:{self.dtypes}')
self.input_options = tf.distribute.InputOptions(
experimental_place_dataset_on_device=True if num_devices>1 else False,
experimental_fetch_to_device=False,
experimental_replication_mode=tf.distribute.InputReplicationMode.PER_REPLICA if num_devices>1 else tf.distribute.InputReplicationMode.PER_WORKER,
)
def sharp_policy(self) -> Policy:
sharp = augmentations.sharpness.augmentation(mag_range=(0, 0.9), randomly_negate=True)
return Policy(name="sharp", num_magnitude_bins=11,sub_policies=[
[(sharp, 0.2, 5),],
])
@pipeline_def(device_id=0, batch_size=32, num_threads=4, enable_conditionals=True)
def _semantic_segmentation_pipeline(self, shard_id=0): # , shard_id=0
inputs = fn.readers.tfrecord(
path=self.record_files,
index_path=self.idx_files,
features={
"image/encoded" : tfrec.FixedLenFeature((), tfrec.string, ""),
"image/filename": tfrec.FixedLenFeature([], tfrec.string, ""),
"image/format": tfrec.FixedLenFeature((), tfrec.string, "png"),
"image/height": tfrec.VarLenFeature((), tfrec.int64, 0),
"image/width": tfrec.VarLenFeature((), tfrec.int64, 0),
"image/segmentation/class/encoded": tfrec.FixedLenFeature((), tfrec.string, ""),
"image/segmentation/class/format": tfrec.FixedLenFeature((), tfrec.string, "png")},
random_shuffle=True, num_shards=self.num_devices, shard_id=shard_id, prefetch_queue_depth=self.batch_size, pad_last_batch=True) # , shard_id=shard_id
images = inputs["image/encoded"]
labels = inputs["image/segmentation/class/encoded"]
images = fn.decoders.image(images, device="mixed", output_type=types.RGB)
labels = fn.decoders.image(labels, device="mixed", output_type=types.GRAY)
images = fn.resize(images, size=(512,512), minibatch_size=self.batch_size, device="gpu")
labels = fn.resize(labels, size=(512,512), minibatch_size=self.batch_size, device="gpu")
# images = fn.cast(images, dtype=types.DALIDataType.INT32)
# labels = fn.cast(labels, dtype=types.DALIDataType.INT32)
raw_images = images
raw_labels = labels
if self.augment:
print('do augment...')
images, labels = rand_flip(images, labels, horizontal=True, vertical=False, size=[512,512], device="gpu")
images, labels = rand_zoom(images, labels, x_zoom_range=[0.9, 1.3], y_zoom_range=[0.9, 1.3], device="gpu")
images, labels = rand_rotate(images, labels, angle_range=[-30, 30], device="gpu")
# # images, labels = rand_crop(images, labels, device="gpu")
images, labels = gridmask(images, labels, ratio_range=[0.1, 0.5], tile_range=[512//20, 512//3], rotate_range=[-30, 30], device="gpu")
images = rand_bright(images, bright_range=[-0.2, 0.2], device="gpu")
images = rand_chroma(images, chroma_range=[0.5, 1.5], device="gpu")
images = rand_contrast(images, contrast_range=[0.5, 1.5], device="gpu")
images = rand_sharp(images, scale=1.0, window_size=3, device="gpu")
images = rand_gauss(images, sigma=1, window_size=5, device="gpu")
images = rand_equalize(images, device="gpu")
images = fn.cast(images, dtype=types.DALIDataType.INT32)
labels = fn.cast(labels, dtype=types.DALIDataType.INT32)
labels = fn.reshape(labels, shape=[-1, 512*512, 1]) # must be continues
labels = fn.one_hot(labels, num_classes=self.num_class, dtype=types.DALIDataType.INT32)
images = fn.crop_mirror_normalize(
images,
dtype=types.FLOAT,
mean=.0 , # [127.5, 127.5, 127.5]
std=127.5, # [127.5, 127.5, 127.5]
shift=-1.0,
output_layout="HWC")
return images, labels#, raw_images, raw_labels,
def dataset_fn(self, input_context):
with tf.device("/gpu:{}".format(input_context.input_pipeline_id)):
device_id = input_context.input_pipeline_id
num_shards = input_context.num_input_pipelines #****
# print(f"num_shards: {num_shards}")
# print(f'input_context.input_pipeline_id:{device_id}')
return dali_tf.DALIDataset(
pipeline=self._semantic_segmentation_pipeline(batch_size=int(self.total_batch_size/num_shards), device_id=device_id, num_threads=num_shards, shard_id=device_id), # shard_id=device_id, , prefetch_queue_depth={ "cpu_size": num_shards, "gpu_size": num_shards}
batch_size=int(self.total_batch_size/num_shards),
output_shapes=self.shapes,
output_dtypes=self.dtypes,
exec_separated=True,
# device_id=device_id,
)
def get_dataset(self):
dataset = self.strategy.distribute_datasets_from_function(self.dataset_fn, self.input_options)
return dataset
def test_pipeline(self, save_path='./check_dali_pipeline'):
pipeline=self._semantic_segmentation_pipeline(batch_size=int(self.total_batch_size/self.num_devices))
os.makedirs(save_path, exist_ok=True)
print(f"checking the pipeline, result save in {save_path}")
for ep in range(3):
print(f'epoch {ep} ......')
pipeline.build()
for i in range(5):
images, masks = pipeline.run()
# print(type(images)) # gpu
images = images.as_cpu()
img = images.at(0)
img = (img + 1.0) * 127.5
img = img.astype(np.uint8)
# print('img shape: ', img.shape) # 512, 512, 3
img = Image.fromarray(img)
img.save(os.path.join(save_path, f'test{i}_ep{ep}.png'))
masks = masks.as_cpu()
# print(f"mask type: {type(masks)}") # cpu
mask = masks.at(0).astype(np.uint8) # .reshape([512, 512])
# print(mask.shape)
mask = mask.argmax(axis=-1)
mask = mask.reshape([512, 512])
# print(mask.shape)
color_mask = label_to_color_image(mask)
color_mask = color_mask.astype(np.uint8)
# print(color_mask.shape)
mask_img = Image.fromarray(color_mask)
mask_img.save(os.path.join(save_path, f'test{i}_ep{ep}_mask.png'))
train
import os
os.environ["CUDA_VISIBLE_DEVICES"] = '6, 7' # 0, 1, 2, 3, 4, 5, 6, 7
import tensorflow as tf
from tensorflow.keras.optimizers.schedules import PolynomialDecay
import experimental
import logging
import random
import time
import json
import yaml
import shutil
from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint, ReduceLROnPlateau, EarlyStopping, TerminateOnNaN
from deeplabv3p.model import get_deeplabv3p_model
from common.utils import get_data_list
from deeplabv3p.gen_data import generate_dataset
from pipeline.dali_pipeline import SegPipeline
tf.get_logger().setLevel(logging.ERROR)
# init note dict
training_diary = {}
# get model name
os.makedirs('./tmp', exist_ok=True)
last_name = ''
if os.path.exists('./tmp/last_model_name.txt'):
with open('./tmp/last_model_name.txt', 'r') as f:
last_name = f.read()
if last_name:
model_name = input(f"\nplease input your model name(input 'n' for no training diary, push 'p' to use last name:{last_name}): \n")
if model_name == 'p':
model_name = last_name
else:
model_name = input(f"\nplease input your model name(input 'n' for no training diary: \n")
# get time
cur_time = time.strftime("%Y%m%d-%H%M%S", time.localtime())
training_diary['training date'] = cur_time
if model_name == 'n':
model_name = cur_time
last_name = model_name
with open('./tmp/last_model_name.txt', 'w') as f:
f.write(last_name)
# load config yaml
print('-'*30, "params", '-'*30)
with open("cfg/cfgs.yaml", 'r') as f:
cfgs = yaml.unsafe_load(f)
params = cfgs['params'] # training params
data_cfg = cfgs['data'] # data params
training_diary["params"] = params # note
for k, v in params.items():
print('%-30s'%f'{k}:', '%-20s'%f'{v}')
# get device ids
DEVICES = params['devices']# ["/gpu:0", "/gpu:1"],,
# get train and val data path
training_diary["data"] = data_cfg
DATA_PATH = data_cfg['main'] # "/data01/zzl/6classresize/hzhang/day/"
with open(os.path.join(DATA_PATH, "train_val_num.json"), 'r') as f:
train_val_num = json.load(f)
# log path
LOG_DIR = params['log_dir']
LOG_PATH = os.path.join(LOG_DIR, model_name)
if os.path.exists(LOG_PATH):
wether_del = input(f'{LOG_PATH} the folder already exists,remove it?(y/n):')
if wether_del == 'y':
shutil.rmtree(LOG_PATH)
else:
LOG_PATH = LOG_PATH + '_' + cur_time
os.makedirs(LOG_PATH, exist_ok=True)
print('%-30s'%f'log path:', '%-20s'%f'{LOG_PATH}')
training_diary['log_path'] = LOG_PATH # note
# params
EPOCHS = params['epochs']
LR = params['learning_rate']
BATCH_SIZE_PER_DEV = params['batch_size_per_dev'] # per gpu
DROPOUT = 0.2
IMAGE_SIZE = params['image_size'] # 28
NUM_CLASSES = params['num_classes'] # 10
LOAD_MODEL_FROM_FILE=params['load_model_from_file']
NUM_DEVICES = None
def setup_gpus():
for gpu_instance in tf.config.list_physical_devices("GPU"):
#if '5' in gpu_instance.name or '6' in gpu_instance.name:
tf.config.experimental.set_memory_growth(gpu_instance, True)
tf.config.set_soft_device_placement(True)
setup_gpus()
def get_strategy(gpu_ind_str):
gpus = tf.config.experimental.list_physical_devices('GPU')
print('-'*30, 'get strategy', '-'*30)
print('%-20s'%'gpu_ind_str:', '%-20s'%f'{gpu_ind_str}')
global NUM_DEVICES
if gpu_ind_str == '-1':
strategy = tf.distribute.get_strategy()
NUM_DEVICES = None
elif len(gpu_ind_str) == 1 and gpus:
NUM_DEVICES = 1
tf.config.set_visible_devices(gpus[int(gpu_ind_str)], 'GPU')
strategy = tf.distribute.OneDeviceStrategy(device=f"/gpu:0")
#elif len(gpu_ind_str) > 1 and gpus:
else:
inds = gpu_ind_str.split(',')
NUM_DEVICES = len(inds)
tf.config.set_visible_devices([gpus[int(ind)] for ind in inds], 'GPU')
devices_list = [f"/gpu:{ind}" for ind in range(len(inds))]
strategy = tf.distribute.MirroredStrategy(devices=devices_list) # , cross_device_ops=tf.distribute.ReductionToOneDevice())
print('%-20s'%'Number of devices:', '%-20s\n'%f'{strategy.num_replicas_in_sync}')
return strategy
# get distribute strategy
# strategy = tf.distribute.MirroredStrategy(devices=DEVICES)
strategy = get_strategy(DEVICES)
BATCH_SIZE = BATCH_SIZE_PER_DEV * NUM_DEVICES
# callbacks
monitor = 'MeanIoU'
tensorboard = TensorBoard(log_dir=LOG_PATH, histogram_freq=0, write_graph=False, write_grads=False, write_images=False, update_freq='batch')
checkpoint = ModelCheckpoint(os.path.join(LOG_PATH, 'ep{epoch:03d}-loss{loss:.3f}-mIoU{MeanIoU:.3f}-val_loss{val_loss:.3f}-val_mIoU{val_MeanIoU:.3f}.h5'),
monitor='val_{}'.format(monitor),
mode='max',
verbose=1,
save_weights_only=False,
save_best_only=False,
period=1)
reduce_lr = ReduceLROnPlateau(monitor='val_{}'.format(monitor),
factor=0.5, # 0.5
mode='max',
patience=5, # 5
verbose=1,
cooldown=0,
min_lr=1e-6)
early_stopping = EarlyStopping(monitor='val_{}'.format(monitor),
min_delta=0,
patience=100,
verbose=1,
mode='max')
#checkpoint_clean = CheckpointCleanCallBack(log_dir, max_val_keep=5, max_eval_keep=2)
terminate_on_nan = TerminateOnNaN()
#callbacks = [tensorboard, checkpoint, reduce_lr, early_stopping, terminate_on_nan, checkpoint_clean]
callbacks = [tensorboard, checkpoint, early_stopping, terminate_on_nan]
with strategy.scope():
if not LOAD_MODEL_FROM_FILE:
# load the model
print("\nload model by function get_deeplabv3p_model()...\n")
model = get_deeplabv3p_model("mobilenetv3small_lite", NUM_CLASSES, (512, 512), 16, 0, weights_path=params['weight_path']) # weights_path
else:
print("\nload model by .keras file...\n")
model = tf.keras.models.load_model(
filepath="weights/last.keras",
compile=True,
custom_objects=experimental.get_custom_objects(),
)
lr_scheduler = PolynomialDecay(initial_learning_rate=LR, decay_steps=100000, end_learning_rate=LR/100)
optimizer = tf.keras.optimizers.SGD(learning_rate=lr_scheduler, momentum=0.9, nesterov=False)
model.compile(optimizer=optimizer,
loss=tf.keras.losses.CategoricalCrossentropy(from_logits=False),
metrics=[tf.keras.metrics.OneHotMeanIoU(num_classes=NUM_CLASSES, name='MeanIoU')]
)
# model.summary()
### loading data
if data_cfg['get_train_data_from_dali']: ## Use this branch to load training data
print('load train data from dali...')
train_record_path = data_cfg['train_record_path']# os.path.join(TFRECORD_PATH, "train")
train_idx_path = data_cfg['train_idx_path'] # os.path.join(IDX_PATH, "train")
train_record_files = tf.io.gfile.glob(os.path.join(train_record_path,"*.tfrecord"))
train_record_files.sort()
train_idx_files = tf.io.gfile.glob(os.path.join(train_idx_path,"*.idx"))
train_idx_files.sort()
print5(train_idx_files)
TRAIN_SAMPLE_NUM = train_val_num["train"] # 31146
train_pipe = SegPipeline(train_record_files, train_idx_files, image_size=IMAGE_SIZE, batch_size=BATCH_SIZE_PER_DEV, num_class=7,num_devices=NUM_DEVICES, augment=data_cfg['augment'], strategy=strategy) # train_record_files, train_idx_files
train_pipe.test_pipeline(save_path='./check_train_dali_pipeline') # 将数据输出以供检查
train_dataset = train_pipe.get_dataset()
else:
print('load train data from generate_dataset...')
train_list_file = data_cfg['train_list_file']
train_list = get_data_list(train_list_file, shuffle=True)
TRAIN_SAMPLE_NUM = len(train_list)
train_val_num['train'] = TRAIN_SAMPLE_NUM
train_dataset = generate_dataset(train_list, BATCH_SIZE, epochs=EPOCHS, num_classes=NUM_CLASSES, raw_shape=[512, 512], out_shape=[512, 512], aug=data_cfg['augment'] , shuffle=True, aug_name=data_cfg['aug_name'], strategy=strategy)
if data_cfg['get_val_data_from_dali']: ## Use this branch to load validation data
print('load val data from dali...')
val_record_path = data_cfg['val_record_path'] # os.path.join(TFRECORD_PATH, "val")
val_idx_path = data_cfg['val_idx_path'] # os.path.join(IDX_PATH, "val")
val_record_files = tf.io.gfile.glob(os.path.join(val_record_path,"*.tfrecord"))
val_record_files.sort()
val_idx_files = tf.io.gfile.glob(os.path.join(val_idx_path,"*.idx"))
val_record_files.sort()
print5(val_idx_files)
VAL_SAMPLE_NUM = train_val_num["val"] # TRAIN_SAMPLE_NUM#
val_pipe = SegPipeline(val_record_files, val_idx_files, image_size=IMAGE_SIZE, batch_size=BATCH_SIZE_PER_DEV, num_class=7, num_devices=NUM_DEVICES, augment=False, strategy=strategy) # val_record_files, val_idx_files, num_devices=NUM_DEVICES
val_pipe.test_pipeline(save_path='./check_val_dali_pipeline') # check data
val_dataset = val_pipe.get_dataset()
else:
print('load val data from generate_dataset...')
val_list_file = data_cfg['val_list_file']
val_list = get_data_list(train_list_file, shuffle=True)
VAL_SAMPLE_NUM = len(val_list)
train_val_num['val'] = VAL_SAMPLE_NUM
val_dataset = generate_dataset(val_list, BATCH_SIZE, epochs=EPOCHS, num_classes=NUM_CLASSES, raw_shape=[512, 512], out_shape=[512, 512], aug=False, shuffle=False, strategy=strategy)
ITERATIONS = TRAIN_SAMPLE_NUM//BATCH_SIZE # 100
VAL_ITERATIONS = VAL_SAMPLE_NUM//BATCH_SIZE
training_diary["data"]['batch_size'] = BATCH_SIZE
training_diary["data"]['train_iter'] = ITERATIONS
training_diary["data"]['val_iter'] = VAL_ITERATIONS
training_diary["data_num"] = train_val_num # note
# data information
print(f"\ttrain number: {TRAIN_SAMPLE_NUM}\n\tval number: {VAL_SAMPLE_NUM}")
print(f"\tbatchsize:{BATCH_SIZE}\n\tITERATIONS: {ITERATIONS}\n\tVAL_ITERATIONS: {VAL_ITERATIONS}")
### training
# print('will model.fit...')
start_time = time.time()
model.fit(train_dataset, # train_dataset train_generator
steps_per_epoch=ITERATIONS,
validation_data=val_dataset, # valid_generator val_dataset
validation_steps=VAL_ITERATIONS,
epochs=EPOCHS,
initial_epoch=0,
verbose=1,
workers=8,
use_multiprocessing=True,
max_queue_size=64,
callbacks=callbacks
)
end_time = time.time()
training_diary['training_time'] = f"{(end_time - start_time)//3600}h {((end_time - start_time)%3600)//60}min {(end_time - start_time)%60:.2f}s"
if not model_name == 'n':
with open('./training_diary.yaml', 'a') as f_diary:
f_diary.write('#'*65+'\n')
yaml.dump({model_name: training_diary}, f_diary)
Check for duplicates
- I have searched the open bugs/issues and have found no duplicates for this bug report
Hi @wangdada-love,
Thank you for reaching out and sharing your code.
It is very hard to just review the code and spot the bug. What I can recommend is to:
- make sure that each GPU instance gets a unique shard id
- make sure that the number of shards is equal to the number of GPU instances you use
- you can run the pipelines without the training and check if there are no duplicated samples. I see that you set
pad_last_batch=True
which would duplicate the last sample in the last batch overrepresenting it. While it may sense with other frameworks when the iterator can trim them and not present to the network it is currently not possible with Tensorflow so you can consider turning this off (at least for training) - cross-check training/validation pipeline by training with DALI and validating with reference validation (even just a couple of epochs) and the other way around
Maybe the above will help you narrow down the problem.
thanks for your response.
I extracted some data for the experiment and found that the curve is normal. It should be a problem with my data