Support data parallelism with a GPU cluster
Opened this issue · 4 comments
Data Parallelism
Data parallelism replicates the model on every device to generates gradients independently and then communicates those gradients at each iteration to keep model replicas consistent.
Following is a survey for support data parallelism in GoTorch.
Solutions
NCCL and Gloo
NCCL provides Broadcast and AllReduce C APIs, we could wrapper them in Go, and use them directly in GoTorch.
Gloo is another collective communications library, which supports both CPU and GPU.
The GPU performance of NCCL is better than Gloo.
PyTorch Distributed Package
It does more optimizations, including bucketing small gradients into a big tensor, overlapping communication and computation.
The idea of gradient bucketing is motivated by the observation that collective communications are more efficient on large tensors.
DDP registers one autograd hook for each gradient accumulator. The hook fires after its corresponding accumulator updating the gradients, and will inspect the bucket it pertains. If hooks of all gradients in the same buckets have fired, the last hook will trigger an asynchronous AllReduce on that bucket.
Please refer to this paper for more details.
Horovod
Horovod is a distributed deep learning training framework for TensorFlow, Keras, and PyTorch. Horovod calls NCCL or Gloo underneath.
Horovod also does many optimizations for communication. It uses the hook mechanism of PyTorch to overlapping communication and computation.
Horovod also supports elastic training.
The biggest difference when moving from normal distributed training to elastic training is the need to track and synchronize among the workers as workers are added or removed from the job.
The elastic training depends on the Gloo library. So, the GPU performance may suffer a little.
An interesting observation: People who want to run TensorFlow with AllReduce distributed strategy will choose Horovod, whereas people who want to run PyTorch with AllReduce distributed strategy will choose torch.DistributedDataParallel
directly.
Summary
So, let's make a summary:
Solution | Performance | Effort |
---|---|---|
NCCL/Gloo | + | expose Broadcast/AllReduce C APIs to Go |
PyTorch | ++ | reimplement PyTorch distributed Python package in Go, and expose the C++ part to Go |
Horovod | ++ | reimplement Horovod Python package in Go, and expose the C++ part to Go |
Note 1
Key points to improve the performance:
- bucketing small gradients
- using the hook mechanism to launch Allreduce kernel asynchronously
Note 2
Both Horovod and PyTorch support Gloo backend, so we could support elastic training later if we choose either solution.
Horovod with PyTorch V.S. PyTorch DistributedDataParallel
- Case 1:
PyTorch DistributedDataParallel and Horovod distributed training benchmarks
Here are some training times comparing Horovod and DistributedDataParallel. This is a DeepLabV3-ResNet 101 model trained on PASCAL VOC 2012 on a machine with one, four, and eight V100 GPUs respectively. Horovod is about 10 to 20 percent faster, definitely nice-to-have, maybe not a must-have though
- Case 2:
libtorch provides a thin wrapper for NCCL/Gloo, ProcessGroupNCCL.
It has two advantages comparing with using NCCL directly:
- ProcessGroupNCCL takes torch Tensors as input
- ProcessGroupNCCL launches allreduce kernel with a separate CUDA stream, which achieves potentially concurrency and better performance.
Here is an example:
#include <c10d/FileStore.hpp>
#include <c10d/ProcessGroupGloo.hpp>
using namespace ::c10d;
int main(int argc, char** argv) {
int rank = atoi(getenv("RANK"));
int size = atoi(getenv("SIZE"));
auto store = std::make_shared<FileStore>("/tmp/c10d_example", size);
ProcessGroupGloo pg(store, rank, size);
// Create some tensors
const auto ntensors = 10;
std::vector<at::Tensor> tensors;
for (auto i = 0; i < ntensors; i++) {
auto x =
at::ones({1000, 16 * (i + 1)}, at::TensorOptions(at::CPU(at::kFloat)));
tensors.push_back(x);
}
// Kick off work
std::vector<std::shared_ptr<ProcessGroup::Work>> pending;
for (auto i = 0; i < ntensors; i++) {
std::vector<at::Tensor> tmp = {tensors[i]};
pending.push_back(pg.allreduce(tmp));
}
// Wait for work to complete
for (auto& work : pending) {
work->wait();
}
}
We want to train a distributed MNIST example, the following is a MVP(Minimum Viable Product) for the target:
- A RecordIODataLoader to read data after static partition.
- Expose ProcessGroupNCCL to Go, so we could apply broadcast/allreduce operations on torch tensors in Go.
- A Launcher to create training processes for each GPU.
After we complete the above things, we could do more optimizations, including:
- support dynamic data partition
- use the hook mechanism to overlap communication and computation
- bucket small gradients into a big tensor to reduce CUDA kernel launching overhead
Any progress for distributed training