rwth-i6/returnn

`DistributeFilesDataset` with sharding on file level

albertz opened this issue · 6 comments

This is for multi GPU training.

I think the random_seed_offset mechanism should be fine once you train multiple full epochs per each GPU worker.

However, when the data is so large that a single GPU worker will never see the whole data once, i.e. does never finish one full epoch, it might make sense to implement sharding on file-level.

It could look like this: At the beginning of a full epoch: when we shuffle the files (logic we currently have in init_seq_order: random_generator.shuffle(files_order_flat)), we would still use the same random seed for every GPU worker. Then we distribute the files for the GPU workers. This could actually use the same _get_files_per_sub_epochs which we use for the sub epochs, but now distributing over the workers, not the sub epochs. Then you pick the right part for the current GPU worker, and apply the remaining logic as before, i.e. now _get_files_per_sub_epochs for sub epochs. Whether the sub datasets now use the same random seed for all workers or a different random seed does not really matter so much, as they anyway all use different data.

Is random_seed_offset in range(0, NUM_GPU_WORKERS), i.e. can we rely on that to give a worker its index/ID in the list of all workers?

Unrelated to that: I feel like this could be a separate dataset and then be composable with others? Maybe that makes the whole RETURNN config too complex though.

Unrelated to that: I feel like this could be a separate dataset and then be composable with others? Maybe that makes the whole RETURNN config too complex though.

On that note, since not sharding is our default for multi GPU and both are equivalent if we adjust the number of epochs, maybe it does make sense to just extend the DistributeFilesDataset, since it is the only one that reasonably handles data this large that sharding starts to make sense on its own.

Is random_seed_offset in range(0, NUM_GPU_WORKERS), i.e. can we rely on that to give a worker its index/ID in the list of all workers?

No. First of all, random_seed_offset can also be user configured. (Actually, this was maybe a bad idea, because if the user configures this, but then uses distributed training, the user would get the same seed for every worker, which is probably not the intention.) But then, the default logic is this:

env_val = os.environ.get(RANDOM_SEED_OFFSET_ENV_VAR)
if env_val is not None:
    return int(env_val)
elif config.typed_value("torch_distributed") is not None:
    import returnn.torch.distributed

    return returnn.torch.distributed.get_ctx(config=config).rank() * 16127
elif config.is_true("use_horovod"):
    assert config.bool("use_tensorflow", False) or config.value("backend", "").startswith("tensorflow")

    import returnn.tf.horovod

    if returnn.tf.horovod.get_ctx(config=config).is_dataset_distribution_random_seed_offset():
        return returnn.tf.horovod.get_ctx(config=config).rank() * 16127

To get the worker id for sharding, I think we explicitly need to pass it to child datasets, maybe sth like:

@dataclass
class ShardInfo:
  shard_idx: int
  num_shards: int

Unrelated to that: I feel like this could be a separate dataset and then be composable with others? Maybe that makes the whole RETURNN config too complex though.

I'm not sure I understand. You mean for sharding? But the main problem is that sharding is very specific to each dataset, and many variations of sharding might be possible, but not all variations are efficiently possible, very much dependent on the dataset. E.g. in this issue here, the suggestion is to do sharding on files level. But this is specific to DistributeFilesDataset. Or horovod_dataset_distribution=shard will do it on sequence level (generic, independent of dataset, but inefficient). Or maybe you could also do it on batch level (generic, independent of dataset always easily possible but very inefficient). Other datasets might have other variations.

Thanks for the info!

But the main problem is that sharding is very specific to each dataset, and many variations of sharding might be possible, but not all variations are efficiently possible, very much dependent on the dataset.

Ah, true. Yeah this would then be a separate ShardFilesDataset -- again, I doubt this one pulls its weight though. The functionality is better integrated into the existing one.

Yeah this would then be a separate ShardFilesDataset

This is not what I meant. I meant, sharding is specific to each dataset, and thus should be an option for the dataset. I did not mean that you should copy some dataset to make a new dataset which is just a copy of an existing dataset but with sharding. No, we should just add an option here to DistributeFilesDataset to configure/enable sharding.

On that note, since not sharding is our default for multi GPU

Btw, this is true for PyTorch, but this is not true for TF, where horovod_dataset_distribution = "shard" is actually the default (for historical reasons, because that was our initial implementation, until we figured out that it was just not working because it was too slow).

both are equivalent if we adjust the number of epochs

Also to note here (but I guess you know), even when adjusting the number of epochs, they are not exactly equivalent, only very similar then. For example, with random_seed_offset, it could happen that there is sometimes some small overlap of the data in a sub epoch over the workers, while this can never happen when we do sharding on the files level.