It fits for our problem constraints:
- models fit on one device
- communication is reliable
- only one PS required
- have MPI available.
Additionally, this implementation of a PS
- uses MPI, a declarative standard common in clusters.
- can allow compression if concerned about bandwidth.
or, why use other PS systems? You may...
- want multiple PSs.
- have models that may not fit on one machine.
- not want to use MPI.
- torch.distributed only sends tensors. We may want to send generic Python objects.
- torch.distributed does not have any
Igatherv
, which is what we want for a PS.
-
MPI_Gather
requires the recv count of any particular message.- Send sizes before, allocate largest size (and then trim down per size)
- (when recieinv from any source)
- Tensorflow does... (see tensorflow's docs for ConditionalAccumulator)
- make sure gradients are current
- waits until
$n$ gradients have been received from any number of workers
Implement a parameter server with mpi4py
- Call encode
- Convert to pickle object
- (optional) compress
- Allocate largest size expected + some padding (to see if overflowed)
- Convert to NumPy buffer, use Igather
- Convert back to pickle object when needed
- Send dictionary to decode
- An MPI enabled parameter server will not fit models that cannot fit on every machine.
- Though we can safely (?) assume that data for every machine is costant
- Good summary of parameter servers at http://hunch.net/?p=151364
- This is algorithm AsySG-InCon (for inconsistent reads) in [1]
- [1]:Asynchronous parallel stochastic gradient for nonconvex optimization, https://arxiv.org/abs/1506.08272
# optimizaiton step function
irequest_params()
for p in params:
if rank == 0:
comms = []
while True:
comms += [recv(MPI.ANY_SOURCE)]
if len(comms) == 32:
break
params = [receive(comm) for comm in comms]
p = sum(params)
step()
else:
send(param, 0)
req = ibcast(p.data)
It's easy to impelment inconsistent reads with ibcast
. It's harder to do
consistent reads; we need to do a buffered broadcast to allow workers to
continue to compute gradients.
I tried playing with Iallgatherv
. Some problems:
- Does not support sending objects of unknown sizes. Every rank needs to know the size of every object it's receiving.
- Sending the objects of these types involves sending a tuple as shown in the implementation and the tests