pytorch/pytorch

PyTorch goes distributed

apaszke opened this issue · 14 comments

Together with @0mp, @VirrageS andy @jytug we're developing a torch.distributed package for PyTorch. All work is done in a fork on a thd branch (we didn't want to make a lot of unnecessary noise in the main repo). We're creating this issue, so we can gather feedback on our API designs from all you guys.

We plan to make the package have two modes. The user has to choose one of them as part of the initialisation.

Process group mode

This is very similar to the API defined in MPI. We assume all processes are equal, assign them ranks and later on, allow them to use a well known set of communication collectives like reduce, broadcast, allReduce, gather, scatter, etc.

Example:

import torch.distributed
torch.distributed.init_process_group(backend='tcp')
my_rank = torch.distributed.get_rank()
num_processes = torch.distributed.get_num_processes()

...

if my_rank == 0:
    torch.distributed.send(tensor, 1)
else:
    tensor = torch.distributed.recv(0)

...

result = torch.distributed.all_reduce(tensor)

Master-worker mode

This would provide a very similar API to the torch.cuda package. At the beginning of your script you would have to call torch.distributed.init_master_worker(backend='mpi')

Operation execution is asynchronous w.r.t. to the master process, we'll implement a CUDA-like concurrency model (streams + events). Until then, the only sync points are copies between master and workers.

Example:

import torch.distributed
torch.distributed.init_master_worker(backend='tcp')

x = torch.distributed.FloatTensor(20, 20).fill_(4)
y = torch.randn(20, 20).dist_send()
z = x + y
# z.get_node(), z.get_device() == 0, -1 (i.e. CPU)
cuda_x = x.cuda()
# cuda_x.get_node(), cuda_x.get_device() == 0, 0
with torch.distributed.node(1):
    a = torch.distributed.FloatTensor(10, device=1)
    # a.get_node(), a.get_device() == 1, 1
    cuda_y = y.cuda()
    # cuda_y.get_node(), cuda_y.get_device() == 0, 0
    q = cuda_x + cuda_y
    # q.get_node(), q.get_device() == 0, 0

How to launch the jobs

We'll provide a pytorch_exec utility that will spawn the process groups in a similar fashion that mpiexec does.

Decoupling data backends from other logic

You might have noticed that both init_process_group and init_master_worker accept a backend argument. We're aware that the best strategy for sending the data might be different for every user, and it will be crucial to pick a good one to limit communication overhead. This was the reason why we decided to introduce a DataChannel interface, so users will be able to pick from one of the provided implementations (initially MPI and raw TCP sockets, later RDMA etc.), or add custom ones, so they can easily achieve the lowest overhead possible in their setup.

Please let us know what you think! Thanks!

Shubho here from SVAIL @ Baidu

One long-term thing to keep in mind is interfacing with a job scheduler - SLURM is pretty standard. I think using salloc with pytorch_exec should be fairly easy.

The framework that I helped architect at Baidu follows a peer-to-peer model - all workers are peers - no master and no slave - each peer has access to 1 GPU - and has a synchronous view of the parameter space that is completely replicated on each worker. Workers use MPI to communicate - and the MPI byte-transport layer deals with IBVerb and CUDA shared mem transport (if GPUs are on the same PCI-E root complex and many are). This peer-to-peer architecture is relatively simple and scales really well - at least to 256 GPUs and possibly more (at this point people started losing friends for hogging the cluster). It can also sustain about 70% of InfiniBand's peak bandwidth (for FDR Infiniband) but I had to reimplement the MPI collective operations since they are not optimized. This simple architecture has served us well for anything we train - various recurrent nets with or without attention, wavenet and I don't see why convnets should be an issue. This setup is however not fault tolerant - if a peer dies - the training comes to a halt - and SLURM will time it out and reschedule and this is fairly painless since we save checkpoints and it restarts from the last checkpoint. People rarely notice these failures in practice.

Possibly torch.distributed is more full featured than this... I haven't started looking at the code yet.. but will soon...

Happy to test on our cluster since we have FDR InfiniBand backplane with OpenMPI and SLURM and also contribute our learnings and code

@shubho - I agree with respect to the scheduler, but I also don't see a large issue. However, we want to take some care in being reliant on MPI and checkpoint support. I like MPI because it gives you a good default support base, but can have interesting performance issues. We tend to go right to verbs and sockets because performance can be much better.

What we have found works the best currently is fast on node reduction and then reduction across nodes. However, in practice this means not reducing across PCIe root complexes and treating each root complex that GPUs and hopefully a NIC are attached to as a node. But if you have more complex system configs, for example more IB cards, it gets more complex. Or if you have a custom interconnect... ;-)

Scaling you can achieve will matter a whole lot on the speed of each "node" and the model, but I would expect speech models specifcally to scale pretty well, especially because you can increase the global batch pretty aggressively and maintain convergence. However, that can make checkpoint and recovery on a different sized total machine more problematic if that machine can't fit the global batch of the original. All the hyperparams need to change (generally) to adjust.

But, I like the general direction and idea. I think it's really figuring out an abstraction that allows the transport layer to be changed easily (even if by extension) and support hierarchical setups.

More complex will be model and hybrid paralel designs.

However, the most complicated thing we tend to run into is the input IO pipeline and synchronization across a filesystem. It can make for some interesting interactions, especially if your IO is on the same NICs you are using for reduction...

We have thought about doing more topology aware reduction like you suggested - but then backed off because our collective turned out fast enough the ring-reduce algorithm is really good for large matrices. The ring-reduce conceptually treats every neighboring link as the same - though sometimes they will span QPI bus and sometimes go to the IB switch. Thankfully ignoring this difference hasn't bitten us... yet.. :) In some sense I am just using MPI as a thin layer over all the different transports with just calls to Send and Recv along with their non-blocking versions. In some sense I want a OpenMPI-lite that only has Send, Recv, ISend, IRecv and SendRecv - the rest of the collectives are best written by hand and then you can even play around with sending in reduced precision etc.

Actually speech is not that easy to scale - anything above global batch 1024 hasn't so far worked. So we rarely train beyond 64 GPUs though we can easily scale to 256 GPUs with weak linear scaling with worse convergence. So far hyperparameter or model search hasn't yielded something amenable above 1024 :(

Yes input I/O is the 800 lb gorilla in the room for training that nobody seems to talk about. Designing networked read-only filesystems that can saturate the GPUs is an unsolved problem. We have solved through various hacks - they are not pretty and not very scalable and has been a source of a lot of frustration. And our checkpoint writing can also stress the filesystem. I think this is a broad research topic - training data input and checkpoint data output - but we should worry about this later.

Actually, we’ve abstracted the code used for transmitting the data into a DataChannel interface. Right now we have two implementations - TCP and MPI. But afterwards, we can probably do something like what either / both of you have converged to - which is to use NCCL at a PCI-e root level, and MPI / IBVerbs at a Node level.

However, the whole point is that we want you to be able to customize it even further by writing custom backends. We tried to make it so that the changes to THD code would be minimal. Later, your own implementations can be selected at startup at the Python level by passing its identifier to an init method (and the rest of the python code remains completely unchanged). This can even be maintained as a separate internal library.

At the initial stages, we are focusing on correctness, so we're implementing a simple-stupid-working implementation - with simply calling MPI or writing non-performant TCP routines. Once we cross the unit tests stage, and write all Python bindings, we'll aggressively focus on perf as well.

Also, RPC calls in master-worker mode always use ZMQ for communication, so it won’t mess up any logic in your custom DataChannels. You can be sure they’ll only be used for tensor data. Since all messages will be very very small (10-100B) we decided that it’s not necessary to make the command channel interchangeable.

@shubho - it seems that the approach you used is compatible with the process group mode. It’s pretty much the same as using raw MPI - all workers are equal and there’s no master. We’re adding master-worker mode just because it might be simpler for some people to use an API they know from our CUDA packages. It’s pretty much the same principle - you can make your code automatically distributed by only calling .dist_send(). No need to rearrange stuff and interleave it with MPI collectives. It’s probably not going to keep 100 workers busy, but as long as you keep it <20 it should be ok (workers can use multiple GPUs).

About SLURM, if you use the MPI backend, you wouldn't even need to use pytorch_exec, you can simply use mpiexec.

If any of you were browsing the code and would like to ask any questions feel free to reach out to me on slack or by email, and I can give you a tour or explain what’s the plan at that moment.

All links to the distributed branch are unreachable. Is it possible to have access to the current source ?

Otherwise, I love the chosen abstraction level. One additional (future) feature I'd like to experiment with is RDMA in process group mode. Similar to MPI's single-sided communication.

Thanks a lot !
Edit: Grammar

Yeah, sorry. Once we made the repo public, all the forks got disconnected. I made it public now, so you should be able to access it. Right now we only have MPI and custom simple TCP code, but by extending the DataChannel you can make it use any other way of sending the data. Once the library is finished we'll document this for sure.

process group mode is now merged into master and enabled by default.
Master-worker mode is going to remain experimental, a lot more work wrt perf needs to be done there, and the use-cases aren't as obvious yet.

Hi,

I really enjoy the new Pytorch release, however I tried the torch.distributed interface and here's the error that I'm getting, any advice? Thank you!

import torch.distributed as dist

dist.init_process_group(backend='mpi',
                        world_size=4)

print('Hello from process {} (out of {})!'.format(
        dist.get_rank(), dist.get_world_size()))

RuntimeError: the MPI backend is not available; try to recompile the THD package with MPI support at /tmp/pip-4omfpcm6-build/torch/lib/THD/process_group/General.cpp:17

I think the issue is that binaries are compiled without MPI (we'd need to ship them with an MPI library otherwise). You'll need to install from source if you want to use this backend

I'm also closing this issue, because it's not needed anymore

How to launch the jobs
We'll provide a pytorch_exec utility that will spawn the process groups in a similar fashion that mpiexec does.

Hi, @apaszke. Sorry to bother. Where can I find the pytorch_exec utility and execute it to launch the distributed jobs?

@kaiyuyue look for torch.distributed.launch

@kaiyuyue also look at pytorch/examples#306 for reference on how to use torch.distributed.launch

Any interest in getting PyTorch to work with WorkQueue (master-worker with APIs) via the HTCondor scheduler? Doing so would be able to achieve scale without some of the performance issues of MPI.