dmlc/rabit

[RFC] Rabit2 Design Proposal

Closed this issue · 4 comments

Problem

MPI became de facto protocol for distributed machine learning synchronization framework. From traditional ML such as xgboost or light bgm to recent deep learning framework such as Pytorch or Tensorflow (horovod), MPI communication primitives fits well with machine learning iterative synchronization diagrams.

Majority of MPI frameworks such as facebook gloo or nvidia nccl or uber horvood focus on efficiency especially on GPU communication bandwidth. The driving force behind mostly were able to sync large neural network model efficiently and avoid congestion.

In light of larger dataset, distributed ml involves more hosts and running longer. The risk of single point of failure increases. MPI by nature requires all up which makes it vulnerable to single point of failure. In response to this challenge, majority of ML frameworks support synchronous external checkpoints where training job will perform upload model to external storage before the next iteration. In case any host failure happens, scheduler will redo data distribution, command workers pull last checkpoint from external storage and resume training.

The upside of such implementation divide synchronization failure recovery from MPI operation implementation. It allows various frameworks implement its own checkpointing logic and integrate with device specific synchronization libraries; The downside of such implementation is recovery usually take much longer as it involves reingest entire dataset and involve all workers bootstrap from last checkpoint. Moreover, checkpoint to external storage is relative expensive operation where users choose to do only after a few iterations, making training time lost grows even larger.

On the other venue, compute resource schedulers (K8S GCP, Peloton) introduced preemptive resource allocation to improve overall compute resource utilization. While preemptive resource allocation did improve overall resource utilization ratio and way cheaper to get from cloud vendors. It post a big stability challenge to large scale distributed ML jobs.

Architecture

Resilient ML sync designed to served as generic and performant infra piece tackling common issues in large scale machine learning jobs.

  • Partitioned Mapper: it served as interface with data source and resource scheduler. ML code along with ML sync worker were executed with mapper interface
  • Rabitagent: it served as agent on each mapper and form mapi network to execute allreduce workload from user ML code performantly and reliably.
  • User ML code: it served as building blocks of distributed machine learning building block, to end user Partitioned Mapper and ML sync worker are transparent thanks to machine learning frameworks support(XGBoost, Tensorflow)
  • Shard In Memory Format: thanks to various languages and frameworks involved in large scale machine learning. There is a need to avoid memory copy and ser/des cost.

    Capture

EventDriven Rabit FSM

we employed event based with epoll/kqueue, oa allreduce worker is going through following FSM while executing Allreduce /BroadCast workload. State transition of a worker is triggered by function calls or network events from epoll sockets on non blocking and threading fashion.

List of function call events from code are

  • Start agent: [F1]
  • Connect tracker [F2]
  • Connect peers [F3]
  • Shutdown agent [F4]
  • Call AllReduce [F5]
  • Call Broadcast [F6]
  • Accept missing peers[F7]

List of network events from between agents are

  • EPOLLIN [E1]
  • EPOLLOUT [E2]
  • EPOLLPRI [E3]
  • EPOLLERR [E4]
  • EPOLLUP [E5]

Minor note, thread safe exactly once delivery is guarded with low level edge trigger. It offers better threading parallelism when dealing with multiple connections heavy weight send/recv events handling.

  • AllReduce(AllGather) state, worker no longer doing full loop of connections. It instead only listen to set of connections with data and decide if partial reduce can be executed / partial write can be pipelined.
  • Broadcast(All Scatter) state, thanks to the efficiency of able to operate on large number of connections.(ref C10K), nodes on top of tree can choose to run parallel speculative execution and flood latest model to entire topology when socket EPOLL WRITE is available.
  • Peer Error state, EPOLL ERR/ EPOLLUP will be instrumented to all connected agents in topology. Agents not connected or not interacting peer in substate within allreduce/broadcast state doesn’t need to wait for new agent back online. So peer error state only triggered when E4/E5 happens and agent is reading/writing to it. Otherwise, it will be auto recovered without change agent state in separate thread. (More detail of failure recovery will be introduced in latter section)
  • Task Complete state: agent sends to all peers when it finished broadcast/allreduce task with out of bound message. When all agents recvs out of bound messages from all peers it interacted with, it goes to idle state (mark entire job finishes) and ready for the next job. In the next section of documentation, we will cover more how we utilize asynchronous nature of task complete state in MPI cluster to build efficient snapshot and asynchronous external checkpoint/restore.

2

Notice, with some code change, it is possible to build FSM based on current pollhelper interface instead of epoll. So this is not strictly depending on epoll but more good to IF MORE CONNECTIONS NEEDS TO BE KEPT

Failure Recovery

In original rabit paper, it described failure recovery as following steps

  1. Pause every node until the failed node is fully recovered.
  2. Detect the model version we need to recover by obtaining the minimum operation number. This is done using the Consensus Protocol explained in section 3.2.2.
  3. Transfer the model to the failed node using the Routing Protocol explained in section
  4. Resume the execution of the failed node using the received model. 5. Resume the execution of the other nodes as soon as the failed node catches up.
  5. Single agent crash will stall entire network until 1) failed node back online 2) reach consensus which model to use 3) transferred virtual checkpoints from other agents using routing protocol.

Looking into communication between agents across entire topology, this global lock is not always needed. Those five steps recovery can be optimized with following locking improvements.

Let’s model a single task on a single agent like following:

Data flow

Output = reduce( input1, input2, input3… input_n, input self)

where each input can be parallelized (e.g a dedicated thread); Reduce function enjoy out of order execution property against inputs from sources f(a, b, c) = f(f(a,b), f(c)) and hence can prioritize execution against received results. we introduce two blocking primitives wait_for_peer_recovery and resume to handle peer error state (state transition logic skipped below)

input_i is future running with nonblocking recv 
return entire result from peer_i or reset and keep recv yet transfered result from new peer_j (with same rank as peer_i)
output = reduce(input_self)

while( not all source inputs received){
	for(int i = 0 ; i < n ; i++){
                 if(input_i marked error){
                     input_i.mark_blocked()
                     continue;
                 }
                if(input_i.reduced() == false){
                     output = reduce(reduce(input_i.get()), output)
                 }
         }

        if(all_input_reduced) break;

        for(int i = 0 ; i < n ; i++){
               if(input_i.blocked()){
                     input_i.wait_for_peer_recovery() //blocing call
                     input_i.resume() //resume listening from partial data
               }
        }
}

ouput.send(ouput)
mark allreduce task complete 
//keep output in send buffer until hear allreduce finished at sink peer 

The algorithm fits well with epoll_event based edge trigger input_i ready can be observed with EPOLLIN against each socket independently. Most heavy lift data transfer can be done in parallel while reduce function can be executed on partial inputs instead of waiting for the entire input set become available. It also avoid global locking of network topology with implied data dependency on any connected graph.

Lazy Blocking Recovery

Compared with baseline, we identified two cases where peers downing data exchange with crashing peers could prioritize works and block instead of reset.

  • Peer not waiting for result from crashed agent should keep running as usual.
  • Perr waiting for input from crashed source can still keep other input received/ apply function running. Once crashed source load checkpoint and backfill input, output can be generated against all existing results f(f(existing inputs), f(crashed_backfill_input))

Three cases of running ALLREDUCE MIN on tree based topology(it also applied to ring based)

  1. Agent 1 crash after agent 2 received entire data from agent 1. Async agent processor allreduce(min) keep executing and output 0 despite agent 1 is missing. Agent 1 recovery runs in parallel with allreduce task
  2. Agent 1 crash before send entire data to agent 2, async agent processor allreduce(min) keep executing and output 0 despite agent 1 is missing. It blocks and wait until agent 1 recover (peer error state). During waiting agent 1 recover time, if agent 0 crash, output will treat as case 1)
  3. Agent 1 crash before agent 2 broadcast to it, agent 2 broadcast rest agents to and wait for agent 1 to recover ( peer error state) wait for agent 1 back with latest virtual checkpoint from last iteration. Once agent 1 recover, agent 0 send output to agent 1 and jump to task complete state. At this point, output still stored in send buffer in case any peer crash before they report task complete. It only clear output when agent goes into wait task state.

5

Agent running on peer error state and agent already finished communicating with peers. This is the case where peer read finished all bytes. Agent in peer error state can keep running until next task read peer state. At the same time, the peer in error state(the one cause current agent in peer error state) needs to go through the recovery process by pulling the latest version of model as well as result collected from other peers via read peer. Such data lineage can be found with send buffer per link (up to one iteration). By storing model checkpoint along with send buffers per link in each agent across entire cluster(see rabit paper), rabit2 will be able to recover only crashed agents.

Partial Result

As stated in the previous sections, sources are sending data in non blocking and asynchronous fashion. In case agent crashed, recovered agent can sync with peer on how many bytes already sent to other side. Recovered peer would be able to just send starting from already sent offset instead of resend everything. This helps save network bandwidth.

Screen Shot 2019-04-04 at 11 26 50

Thanks for putting this up. Can we elaborate a bit what is the difference from the original rabit recovery protocol.

Specifically, the original rabit benefit from the assumption that only result of allreduce is needed, and we can just store some of them in certain nodes

Thanks for putting this up. Can we elaborate a bit what is the difference from the original rabit recovery protocol.

Specifically, the original rabit benefit from the assumption that only result of allreduce is needed, and we can just store some of them in certain nodes

@tqchen Absolutely, I updated design with more detail on how we plan to optimize recovery series of single node failures over entire run time. (t1 node1_crash, t2 node2_crash, .... ) In term of store and restore checkpoint within cluster. I plan to stick with current implementation by offering external checkpoint as option( in case all nodes dead or tracker dead)

let's hold a bit on this, we will finalize several parts internally and send to public for review