NVIDIA/DALI

Why is the val_loss curve trained through Dali data loading method oscillating?

wangdada-love opened this issue · 2 comments

Describe the question.

Question description

I referred to official documents and examples to write a class for loading data through Dali, and used this class to load data for my model training. But during the training process, I found that the inference loss and inference IOU of the training were both oscillatory, and this oscillation was periodic. And,I have found through multiple experiments that the oscillation period matches the number of GPUs I have used.
I suspect this is related to the data distribution method when using distributed strategies, but I cannot pinpoint where the specific problem lies.
The following is the code for my data loading class. Could you please help me check if there are any issues and how should I resolve them?

code

dataload class

import os
from PIL import Image
import numpy as np
from nvidia.dali import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.tfrecord as tfrec
import nvidia.dali.types as types
import nvidia.dali.plugin.tf as dali_tf
from nvidia.dali.auto_aug.auto_augment import auto_augment, apply_auto_augment
from nvidia.dali.auto_aug import augmentations
from nvidia.dali.auto_aug.core import Policy

import tensorflow as tf
import sys
sys.path.append("pipeline")
from data_augs import rand_bright, rand_gauss, rand_flip, rand_chroma, rand_zoom, rand_contrast, rand_crop, rand_rotate, gridmask, rand_sharp, rand_equalize
def create_pascal_label_colormap():
    """
    create label colormap with PASCAL VOC segmentation dataset definition

    # Returns
        colormap: Colormap array for visualizing segmentation
    """
    colormap = np.zeros((256, 3), dtype=int)
    index = np.arange(256, dtype=int)

    for shift in reversed(range(8)):
        for channel in range(3):
            colormap[:, channel] |= ((index >> channel) & 1) << shift
        index >>= 3
    # print(colormap)
    return colormap
def label_to_color_image(label):
    """
    mapping the segmentation label to color indexing array

    # Arguments
        label: 2D uint8 numpy array, with segmentation label

    # Returns
        result: A 2D array with floating type. The element of the array
        is the color indexed by the corresponding element in the input label
        to the PascalVOC color map.

    Raises:
        ValueError: If label is not of rank 2 or its value is larger than color
        map maximum entry.
    """
    if label.ndim != 2:
        raise ValueError('Expect 2-D input label')

    colormap = create_pascal_label_colormap()

    if np.max(label) >= len(colormap):
        raise ValueError('label value too large.')

    return colormap[label]

class SegPipeline:
    def __init__(self, record_files, idx_files, image_size=[512, 512], batch_size=32, num_devices=4, num_class=7, augment=False, strategy=None):
        self.record_files = record_files
        self.idx_files = idx_files
        self.augment = augment
        self.strategy = strategy
        # self.image_size = image_size
        self.batch_size = batch_size
        self.total_batch_size = batch_size * num_devices
        self.shapes = (
            (batch_size, image_size[0], image_size[1], 3), (batch_size, image_size[0]*image_size[1], num_class) # num_class
            )
        self.num_devices = num_devices
        self.num_class = num_class
        self.dtypes = (tf.float32, tf.int32)
        # print(f'shapes:{self.shapes}')
        # print(f'shapes:{self.dtypes}')

        self.input_options = tf.distribute.InputOptions(
                                    experimental_place_dataset_on_device=True if num_devices>1 else False,
                                    experimental_fetch_to_device=False,
                                    experimental_replication_mode=tf.distribute.InputReplicationMode.PER_REPLICA if num_devices>1 else tf.distribute.InputReplicationMode.PER_WORKER,
                                )
    
    def sharp_policy(self) -> Policy:
        sharp = augmentations.sharpness.augmentation(mag_range=(0, 0.9), randomly_negate=True)

        return Policy(name="sharp", num_magnitude_bins=11,sub_policies=[
             [(sharp, 0.2, 5),],
         ])

    @pipeline_def(device_id=0, batch_size=32, num_threads=4, enable_conditionals=True)
    def _semantic_segmentation_pipeline(self, shard_id=0): #  , shard_id=0
        inputs = fn.readers.tfrecord(
            path=self.record_files,
            index_path=self.idx_files,
            features={
                "image/encoded" : tfrec.FixedLenFeature((), tfrec.string, ""),
                "image/filename": tfrec.FixedLenFeature([], tfrec.string, ""),
                "image/format": tfrec.FixedLenFeature((), tfrec.string, "png"),
                "image/height": tfrec.VarLenFeature((), tfrec.int64, 0),
                "image/width": tfrec.VarLenFeature((), tfrec.int64, 0),
                "image/segmentation/class/encoded": tfrec.FixedLenFeature((), tfrec.string, ""),
                "image/segmentation/class/format": tfrec.FixedLenFeature((), tfrec.string,  "png")},
                random_shuffle=True, num_shards=self.num_devices, shard_id=shard_id, prefetch_queue_depth=self.batch_size, pad_last_batch=True) # , shard_id=shard_id
        images = inputs["image/encoded"]
        labels = inputs["image/segmentation/class/encoded"]
        images = fn.decoders.image(images, device="mixed", output_type=types.RGB)
        labels = fn.decoders.image(labels, device="mixed", output_type=types.GRAY)
        images = fn.resize(images, size=(512,512), minibatch_size=self.batch_size, device="gpu")
        labels = fn.resize(labels, size=(512,512), minibatch_size=self.batch_size, device="gpu")
        # images = fn.cast(images, dtype=types.DALIDataType.INT32)
        # labels = fn.cast(labels, dtype=types.DALIDataType.INT32)
        raw_images = images
        raw_labels = labels

        if self.augment:
            print('do augment...')
            images, labels = rand_flip(images, labels, horizontal=True, vertical=False, size=[512,512], device="gpu")

            images, labels = rand_zoom(images, labels, x_zoom_range=[0.9, 1.3], y_zoom_range=[0.9, 1.3], device="gpu")


            images, labels = rand_rotate(images, labels, angle_range=[-30, 30], device="gpu")

            # # images, labels = rand_crop(images, labels, device="gpu")

            images, labels = gridmask(images, labels, ratio_range=[0.1, 0.5], tile_range=[512//20, 512//3], rotate_range=[-30, 30], device="gpu")

            images = rand_bright(images, bright_range=[-0.2, 0.2], device="gpu")

            images = rand_chroma(images, chroma_range=[0.5, 1.5], device="gpu")

            images = rand_contrast(images, contrast_range=[0.5, 1.5], device="gpu")

            images = rand_sharp(images, scale=1.0, window_size=3, device="gpu")

            images = rand_gauss(images, sigma=1, window_size=5, device="gpu")

            images = rand_equalize(images, device="gpu")
        
        images = fn.cast(images, dtype=types.DALIDataType.INT32)
        labels = fn.cast(labels, dtype=types.DALIDataType.INT32)
        labels = fn.reshape(labels, shape=[-1, 512*512, 1])  # must be continues
        labels = fn.one_hot(labels, num_classes=self.num_class, dtype=types.DALIDataType.INT32)
        images = fn.crop_mirror_normalize(
            images,
            dtype=types.FLOAT,
            mean=.0 , # [127.5, 127.5, 127.5]
            std=127.5, # [127.5, 127.5, 127.5]
            shift=-1.0,
            output_layout="HWC")
        
        return images, labels#, raw_images, raw_labels, 

    def dataset_fn(self, input_context):
        with tf.device("/gpu:{}".format(input_context.input_pipeline_id)):
            device_id = input_context.input_pipeline_id
            num_shards = input_context.num_input_pipelines #****
            # print(f"num_shards: {num_shards}")
            # print(f'input_context.input_pipeline_id:{device_id}')
            return dali_tf.DALIDataset(
                pipeline=self._semantic_segmentation_pipeline(batch_size=int(self.total_batch_size/num_shards), device_id=device_id, num_threads=num_shards, shard_id=device_id), # shard_id=device_id, , prefetch_queue_depth={ "cpu_size": num_shards, "gpu_size": num_shards}
                batch_size=int(self.total_batch_size/num_shards),
                output_shapes=self.shapes,
                output_dtypes=self.dtypes,
                exec_separated=True,
                # device_id=device_id,
            )
    def get_dataset(self):

        dataset = self.strategy.distribute_datasets_from_function(self.dataset_fn, self.input_options)

        return dataset

    def test_pipeline(self, save_path='./check_dali_pipeline'):
        pipeline=self._semantic_segmentation_pipeline(batch_size=int(self.total_batch_size/self.num_devices))
        os.makedirs(save_path, exist_ok=True)
        print(f"checking the pipeline, result save in {save_path}")
        for ep in range(3):
            print(f'epoch {ep} ......')
            pipeline.build()
            for i in range(5):
                images, masks = pipeline.run()
                # print(type(images))  # gpu

                images = images.as_cpu()
                img = images.at(0)
                img = (img + 1.0) * 127.5
                img = img.astype(np.uint8)
                # print('img shape: ', img.shape) # 512, 512, 3
                img = Image.fromarray(img)
                img.save(os.path.join(save_path, f'test{i}_ep{ep}.png'))

                masks = masks.as_cpu()
                # print(f"mask type: {type(masks)}") # cpu
                mask = masks.at(0).astype(np.uint8) # .reshape([512, 512])
                # print(mask.shape)
                mask = mask.argmax(axis=-1)
                mask = mask.reshape([512, 512])
                # print(mask.shape)
                color_mask = label_to_color_image(mask)
                color_mask = color_mask.astype(np.uint8)
                # print(color_mask.shape)
                mask_img = Image.fromarray(color_mask)
                mask_img.save(os.path.join(save_path, f'test{i}_ep{ep}_mask.png'))

train

import os
os.environ["CUDA_VISIBLE_DEVICES"] = '6, 7' # 0, 1, 2, 3, 4, 5, 6, 7

import tensorflow as tf
from tensorflow.keras.optimizers.schedules import PolynomialDecay
import experimental
import logging
import random
import time
import json
import yaml
import shutil

from tensorflow.keras.callbacks import TensorBoard, ModelCheckpoint, ReduceLROnPlateau, EarlyStopping, TerminateOnNaN
from deeplabv3p.model import get_deeplabv3p_model

from common.utils import get_data_list
from deeplabv3p.gen_data import generate_dataset

from pipeline.dali_pipeline import SegPipeline

tf.get_logger().setLevel(logging.ERROR)

# init note dict
training_diary = {}

# get model name
os.makedirs('./tmp', exist_ok=True)
last_name = '' 
if os.path.exists('./tmp/last_model_name.txt'):
    with open('./tmp/last_model_name.txt', 'r') as f:
        last_name = f.read()
if last_name:
    model_name = input(f"\nplease input your model name(input 'n' for no training diary, push 'p' to use last name:{last_name}): \n")
    if model_name == 'p':
        model_name = last_name
else:
    model_name = input(f"\nplease input your model name(input 'n' for no training diary: \n")

# get time
cur_time = time.strftime("%Y%m%d-%H%M%S", time.localtime())
training_diary['training date'] = cur_time

if model_name == 'n':
    model_name = cur_time
last_name = model_name
with open('./tmp/last_model_name.txt', 'w') as f:
    f.write(last_name)

# load config yaml
print('-'*30, "params", '-'*30)
with open("cfg/cfgs.yaml", 'r') as f:
    cfgs = yaml.unsafe_load(f)

params = cfgs['params'] # training params
data_cfg = cfgs['data'] # data params
training_diary["params"] = params # note
for k, v in params.items():
    print('%-30s'%f'{k}:', '%-20s'%f'{v}')

# get device ids
DEVICES = params['devices']# ["/gpu:0", "/gpu:1"],,

# get train and val data path

training_diary["data"] = data_cfg
DATA_PATH = data_cfg['main'] # "/data01/zzl/6classresize/hzhang/day/"

with open(os.path.join(DATA_PATH, "train_val_num.json"), 'r') as f:
    train_val_num = json.load(f)

# log path
LOG_DIR = params['log_dir']

LOG_PATH = os.path.join(LOG_DIR, model_name)
if os.path.exists(LOG_PATH):
    wether_del = input(f'{LOG_PATH} the folder already exists,remove it?(y/n):')
    if wether_del == 'y':
        shutil.rmtree(LOG_PATH)
    else:
        LOG_PATH = LOG_PATH + '_' + cur_time
os.makedirs(LOG_PATH, exist_ok=True)
print('%-30s'%f'log path:', '%-20s'%f'{LOG_PATH}')
training_diary['log_path'] = LOG_PATH # note

# params
EPOCHS = params['epochs']
LR = params['learning_rate']
BATCH_SIZE_PER_DEV = params['batch_size_per_dev']  # per gpu
DROPOUT = 0.2
IMAGE_SIZE = params['image_size'] # 28
NUM_CLASSES = params['num_classes'] # 10
LOAD_MODEL_FROM_FILE=params['load_model_from_file']

NUM_DEVICES = None

def setup_gpus():
    for gpu_instance in tf.config.list_physical_devices("GPU"):
        #if '5' in gpu_instance.name or '6' in gpu_instance.name:
        tf.config.experimental.set_memory_growth(gpu_instance, True)
    tf.config.set_soft_device_placement(True)

setup_gpus()

def get_strategy(gpu_ind_str):
    gpus = tf.config.experimental.list_physical_devices('GPU')
    print('-'*30, 'get strategy', '-'*30)
    print('%-20s'%'gpu_ind_str:', '%-20s'%f'{gpu_ind_str}')
    global NUM_DEVICES
    if gpu_ind_str == '-1':
        strategy = tf.distribute.get_strategy()
        NUM_DEVICES = None
    elif len(gpu_ind_str) == 1 and gpus:
        NUM_DEVICES = 1
        tf.config.set_visible_devices(gpus[int(gpu_ind_str)], 'GPU')
        strategy = tf.distribute.OneDeviceStrategy(device=f"/gpu:0")
    #elif  len(gpu_ind_str) > 1 and gpus:
    else:
        inds = gpu_ind_str.split(',')
        NUM_DEVICES = len(inds)
        tf.config.set_visible_devices([gpus[int(ind)] for ind in inds], 'GPU')
        devices_list = [f"/gpu:{ind}" for ind in range(len(inds))]
        strategy = tf.distribute.MirroredStrategy(devices=devices_list) # , cross_device_ops=tf.distribute.ReductionToOneDevice())
    
    print('%-20s'%'Number of devices:', '%-20s\n'%f'{strategy.num_replicas_in_sync}')
    return strategy

# get  distribute strategy
# strategy = tf.distribute.MirroredStrategy(devices=DEVICES)
strategy = get_strategy(DEVICES)
BATCH_SIZE = BATCH_SIZE_PER_DEV * NUM_DEVICES

# callbacks

monitor = 'MeanIoU'
tensorboard = TensorBoard(log_dir=LOG_PATH, histogram_freq=0, write_graph=False, write_grads=False, write_images=False, update_freq='batch')
checkpoint = ModelCheckpoint(os.path.join(LOG_PATH, 'ep{epoch:03d}-loss{loss:.3f}-mIoU{MeanIoU:.3f}-val_loss{val_loss:.3f}-val_mIoU{val_MeanIoU:.3f}.h5'),
                             monitor='val_{}'.format(monitor),
                             mode='max',
                             verbose=1,
                             save_weights_only=False,
                             save_best_only=False,
                             period=1)
 
reduce_lr = ReduceLROnPlateau(monitor='val_{}'.format(monitor), 
                              factor=0.5, # 0.5
                              mode='max',
                              patience=5,  # 5
                              verbose=1, 
                              cooldown=0, 
                              min_lr=1e-6)

early_stopping = EarlyStopping(monitor='val_{}'.format(monitor), 
                               min_delta=0, 
                               patience=100, 
                               verbose=1, 
                               mode='max')
#checkpoint_clean = CheckpointCleanCallBack(log_dir, max_val_keep=5, max_eval_keep=2)
terminate_on_nan = TerminateOnNaN()

#callbacks = [tensorboard, checkpoint, reduce_lr, early_stopping, terminate_on_nan, checkpoint_clean]
callbacks = [tensorboard, checkpoint, early_stopping, terminate_on_nan]

with strategy.scope():
    if not LOAD_MODEL_FROM_FILE:
    
    # load the model
        print("\nload model by function get_deeplabv3p_model()...\n")
        model = get_deeplabv3p_model("mobilenetv3small_lite", NUM_CLASSES, (512, 512), 16, 0, weights_path=params['weight_path']) # weights_path
    else:
        print("\nload model by .keras file...\n")
        model = tf.keras.models.load_model(
                filepath="weights/last.keras",
                compile=True,
                custom_objects=experimental.get_custom_objects(),
            )
        
    lr_scheduler = PolynomialDecay(initial_learning_rate=LR, decay_steps=100000, end_learning_rate=LR/100)

    optimizer = tf.keras.optimizers.SGD(learning_rate=lr_scheduler, momentum=0.9, nesterov=False)
    model.compile(optimizer=optimizer, 
                  loss=tf.keras.losses.CategoricalCrossentropy(from_logits=False), 
                  metrics=[tf.keras.metrics.OneHotMeanIoU(num_classes=NUM_CLASSES, name='MeanIoU')]
                )
    # model.summary()


### loading data

if data_cfg['get_train_data_from_dali']:   ## Use this branch to load training data
    print('load train data from dali...')
    train_record_path = data_cfg['train_record_path']# os.path.join(TFRECORD_PATH, "train")
    train_idx_path = data_cfg['train_idx_path'] # os.path.join(IDX_PATH, "train")
    train_record_files = tf.io.gfile.glob(os.path.join(train_record_path,"*.tfrecord")) 
    train_record_files.sort()
    train_idx_files = tf.io.gfile.glob(os.path.join(train_idx_path,"*.idx")) 
    train_idx_files.sort()
    print5(train_idx_files)
    TRAIN_SAMPLE_NUM = train_val_num["train"] # 31146 
    train_pipe = SegPipeline(train_record_files, train_idx_files, image_size=IMAGE_SIZE, batch_size=BATCH_SIZE_PER_DEV, num_class=7,num_devices=NUM_DEVICES, augment=data_cfg['augment'], strategy=strategy) # train_record_files, train_idx_files
    train_pipe.test_pipeline(save_path='./check_train_dali_pipeline') # 将数据输出以供检查
    train_dataset = train_pipe.get_dataset()
else:
    print('load train data from generate_dataset...')
    train_list_file = data_cfg['train_list_file']
    train_list = get_data_list(train_list_file, shuffle=True) 
    TRAIN_SAMPLE_NUM = len(train_list)
    train_val_num['train'] = TRAIN_SAMPLE_NUM
    train_dataset = generate_dataset(train_list, BATCH_SIZE, epochs=EPOCHS,  num_classes=NUM_CLASSES, raw_shape=[512, 512], out_shape=[512, 512], aug=data_cfg['augment'] , shuffle=True, aug_name=data_cfg['aug_name'], strategy=strategy)

if data_cfg['get_val_data_from_dali']:  ## Use this branch to load validation data
    print('load val data from dali...')
    val_record_path = data_cfg['val_record_path'] # os.path.join(TFRECORD_PATH, "val")
    val_idx_path = data_cfg['val_idx_path'] # os.path.join(IDX_PATH, "val")
    val_record_files = tf.io.gfile.glob(os.path.join(val_record_path,"*.tfrecord")) 
    val_record_files.sort()

    val_idx_files = tf.io.gfile.glob(os.path.join(val_idx_path,"*.idx")) 
    val_record_files.sort()
    print5(val_idx_files)
    VAL_SAMPLE_NUM = train_val_num["val"] # TRAIN_SAMPLE_NUM# 
    val_pipe = SegPipeline(val_record_files, val_idx_files,  image_size=IMAGE_SIZE, batch_size=BATCH_SIZE_PER_DEV, num_class=7, num_devices=NUM_DEVICES, augment=False, strategy=strategy) # val_record_files, val_idx_files, num_devices=NUM_DEVICES
    val_pipe.test_pipeline(save_path='./check_val_dali_pipeline') # check data
    val_dataset = val_pipe.get_dataset()
else:
    print('load val data from generate_dataset...')
    val_list_file = data_cfg['val_list_file']
    val_list = get_data_list(train_list_file, shuffle=True) 
    VAL_SAMPLE_NUM = len(val_list)
    train_val_num['val'] = VAL_SAMPLE_NUM
    val_dataset = generate_dataset(val_list, BATCH_SIZE, epochs=EPOCHS, num_classes=NUM_CLASSES, raw_shape=[512, 512], out_shape=[512, 512], aug=False, shuffle=False, strategy=strategy)

ITERATIONS = TRAIN_SAMPLE_NUM//BATCH_SIZE # 100
VAL_ITERATIONS = VAL_SAMPLE_NUM//BATCH_SIZE

training_diary["data"]['batch_size'] = BATCH_SIZE
training_diary["data"]['train_iter'] = ITERATIONS
training_diary["data"]['val_iter'] = VAL_ITERATIONS

training_diary["data_num"] = train_val_num # note

# data information
print(f"\ttrain  number: {TRAIN_SAMPLE_NUM}\n\tval number: {VAL_SAMPLE_NUM}")
print(f"\tbatchsize:{BATCH_SIZE}\n\tITERATIONS: {ITERATIONS}\n\tVAL_ITERATIONS: {VAL_ITERATIONS}")

### training
# print('will model.fit...')
start_time = time.time() 
model.fit(train_dataset, # train_dataset train_generator
            steps_per_epoch=ITERATIONS,
            validation_data=val_dataset, # valid_generator val_dataset
            validation_steps=VAL_ITERATIONS,
            epochs=EPOCHS,
            initial_epoch=0,
            verbose=1,
            workers=8,
            use_multiprocessing=True,
            max_queue_size=64,
            callbacks=callbacks
            )
end_time = time.time()
training_diary['training_time'] = f"{(end_time - start_time)//3600}h {((end_time - start_time)%3600)//60}min {(end_time - start_time)%60:.2f}s"

if not model_name == 'n':
    with open('./training_diary.yaml', 'a') as f_diary:

        f_diary.write('#'*65+'\n')
        yaml.dump({model_name: training_diary}, f_diary)

Check for duplicates

  • I have searched the open bugs/issues and have found no duplicates for this bug report

Hi @wangdada-love,

Thank you for reaching out and sharing your code.
It is very hard to just review the code and spot the bug. What I can recommend is to:

  • make sure that each GPU instance gets a unique shard id
  • make sure that the number of shards is equal to the number of GPU instances you use
  • you can run the pipelines without the training and check if there are no duplicated samples. I see that you set pad_last_batch=True which would duplicate the last sample in the last batch overrepresenting it. While it may sense with other frameworks when the iterator can trim them and not present to the network it is currently not possible with Tensorflow so you can consider turning this off (at least for training)
  • cross-check training/validation pipeline by training with DALI and validating with reference validation (even just a couple of epochs) and the other way around

Maybe the above will help you narrow down the problem.

thanks for your response.
I extracted some data for the experiment and found that the curve is normal. It should be a problem with my data