dmlc/xgboost

[RFC] Merging RABIT into XGBoost.

trivialfis opened this issue · 17 comments

Background

This is a RFC for merging RABIT into XGBoost. For a long time, RABIT enables support of distributed training for XGBoost and is integrated as a git submodule. Most of tests are run on XGBoost and the code base of XGBoost is tightly connected to RABIT. For example, serialization is built on rabit serializable interface, nccl unique ID is obtained from RABIT, quantile merging between workers is based on RABIT's serializable allreduce handler. Because there are more matured MPI solutions like OpenMPI and UCX for CPU, RABIT did not get too much attention beyond XGBoost. Eventually maintaining RABIT in a separated repository creates more overhead on developers than actual benefits, which is one of the reasons that RABIT is rarely updated. Also we plan to sunset the allreduce implementation in RABIT in the future and utilize other widely adopted MPI solutions listed previously. Merging RABIT into XGBoost first will allow us achieve that incrementally with sufficient tests.

Plan

  • Merge rabit as a git subtree under XGBoost as a standalone directory. (#6001)
  • Merge tests into XGBoost. (#6096)
  • Change CMake scripts for better integration. Removing the mock target. (#6096)
  • Add win socket support in order to reduce the 3 rabit libraries into 1, while supporting distributed training on Windows. (rabit_robust, rabit_base, rabit_empty). (#6105, #6110)
  • Enable clang-tidy on RABIT, other than code style, clang-tidy found some real issues on RABIT's code base so we should enable it as soon as possible (#6095, #6101).
  • Drop single point model recovery. (#6112)

Future work

  • Rework on the OpenMPI backend to have better support for new backend (to be decided), probably adding nccl as a new backend too. This way we can have both old and new backends enabled for a smooth transition.

Concerns

As if we replace RABIT with other MPI solution and drop single point model recovery, would it be better if we don't merge it at all? This seems much cleaner, but as mentioned previously XGBoost is tightly connected to RABIT, also every change on RABIT must be tested on XGBoost first before merging. The replacement won't be trivial and I would like to do it incrementally and carefully.

Let me put my 2 cents here.

The rabit protocol important for integration with existing opensource ecosystem, e.g. Dask, spark and flink. The fault tolerance part is important for scaling in real world settings where a node can fail. The rabit protocol allows single/multiple worker to fail and restart at other machines, while continue the learning process.

Single worker failure and restart is quite common in real world production env, due to pre-emption or spot instance on the cloud. Normal MPI-based solutions usually requires all node to be alive and no failure at all, which brings more cost for real world production settings.

While NCCL or OpenMPI can be supported, they are only useful in an HPC based setting, which are more expensive, and less elastic overall.

Due to the above reasons. I think we should continue to support the rabit as the first class-citizen and should remain as the primary protocol we use for distributed training

@tqchen Thanks for the comment. Thus far single point recovery doesn't work on XGBoost as bootstrapping is still an unresolved issue. We haven't decided what to do with it so we might continue extending the previous work on it or drop it completely in XGBoost. The best scenario from my perspective is we can have it enabled on top of other MPI solutions. If we want more development on it, incubating it inside XGBoost seems better than developing it as a stand alone project at this stage as most of the tests are running on XGBoost. From our perspective @CodingCat @hcho3 merging RABIT into XGBoost can ease the development a lot.

Thanks for the clarification. My comment is mainly to clarify the goals and technical path. Not the code merging

I did see single point recovery work in previous cases, as long as the distributed protocol strictly follows the rabit protocol.

Hopefully the motivation was clear, that in production systems single point failure can happen more often and being able to handle them is critical to make the system scalable in a prod setting. I did recall the code worked for single point failure before.

If it does not work for the current version m, we should spend some effort to look into it and resolve the issue.

I am not against moving rabit into the codebase.

However I do not agree that the path forward is to build the solution on top of existing MPI interfaces. Most MPI API brings requirements that goes beyond allreduce, and it is impossible to resolve single point of failure in common MPI based setting — this is the motivation of the original the rabit protocol.

One goal of distributed XGBoost is to be able to embed the job into common distributed runtimes with the minimum requirement of fail-restart for fault tolerance.

We should continue to make the rabit based protocol as the primary way to drive such needs and to only make MPI as something that is optional

Got it. The RABIT protocol is difficult to follow for most of the existing pipelines in XGBoost as we need synchronization during DMatrix construction. So the real issue is how to recover DMatrix in a multi threaded (async) environment. I'm not sure about the case you have seen as who did the test did not made the test part of XGBoost and so far I haven't seen any working example. I believe it's possible to make it working in general XGBoost pipelines but with good amount of effort. Let me sort out a more detailed roadmap on how do we proceed with this.

@tqchen I just thought about the spot instance use case. I'm not sure how single point model recovery can help. Since once a worker got unplugged all the other workers can not proceed until there's a replacement worker available. So wouldn't it be better if we just check point the model and stop/restart the training altogether? XGBoost does work and being tested for training continuation. For a concrete example, if we are training using 4 workers and 1 of them got unplugged during the 3th iteration, we can simply checkpoint the 2 constructed forests/trees, and start training from 3 iteration again with re-partitioned data on remaining workers, or on a new set of workers according to some scheduling algorithms?

What I'm confused is, to me the only use case for single point model recovery in spot instance is we let all the other workers idle and wait for a new worker from scheduler.

The question is only for better understanding of the use case, not for arguing whether should we remove the feature.

In most of the distributed execution env(e.g. spark, hadoop, dask). The data partitioning is done as part of the pipeline, in most such cases, the basic primitive are

mapGroup(data, some-func)

Within the map function, the data is partitioned(by the execution env) into several group, and each of the group contains one partition of the dataset. Most of these frameworks also have fail-restart mechanism built in, which means if one of the worker, say worker0 get unplugged due to pre-emption or failure. The specific part of the job will restart on another instance somewhere. The rabit protocol fits into such setups naturally.

While we could do things like repartitioning the data, or using other tricks, the data shuffling mechanisms need to be implemented in xgboost itself and breaks the principle of natural integration with distributed exec env.

More importantly, in cases where hundreds of workers are involved in the training. The cost of restarting a single worker is much smaller than restarting all the workers all together. Imagine each worker have a probability p for each worker to be pre-empted during the iteration, and n is the number of workers. The success probability of restart all together approach(which means all workers need to be alive for a single iteration) goes down as we increase n, which is a quiet undesirable problem. That means the bigger the dataset we want to train, there is a larger probability of failure.

In the case of single point failure recovery, while it is true that other workers need to stay idle while waiting for a single worker to restart, the cost of restarting the a single, or small number of workers is small, and is done by most of the distributed exec env.

The problem of initial statistics sync in DMatrix construction can be resolved by dividing the protocol into initial and iteration phase. This is exactly the problem we aim to tackle in a real prod setting. For example, i know there are some uber's internal use-cases that relies on this feature, so that the distributed learning process can scale to the internal cluster where pre-emption can easily happen.

Thanks for the clarification, the failure probability bit helped a lot, also the possible cost savings part. I will consider putting more effort into supporting single point model recovery in the future, would also like to hear opinions from others.

But just to to clear, according to @CodingCat the feature is not used, also the bootstrapping work done by @chenqin laid down a solid foundation but it still has its unaddressed limitations so the feature can not be used yet. Hence I believe enabling the feature is not a process of simple debugging but requires more thoughts and works.

I have some initial ideas on how to extend @chenqin s work on this, but before anything can happen I need to get opinions from other maintainers whether they want to share the responsibility of maintaining it, as I personally don't want to maintain a comm primitive library. (Which is why I want this feature built on top of other matured MPI solution)

Right now spark package kills the spark context in the face of failure, which I believe is to avoid hanging caused by model recovery.

Again back to the point of being able to embed into distributed execution env. Note-ably there are two problems in relying on an existing MPI solution:

  • A MPI library may not be readily available, nor compatible with the existing distributed execution env like Spark/Dask. As a matter of fact, most MPI implementations present themselves as exclusive execution environment without fault tolerance baked in.
    • This is one of the main reason that we go with rabit, as the primitive allows us to inject an allreduce mechanism that is compatible with these exec envs.
  • MPI contains a bigger collection of primitives that goes beyond allreduce, as a result, and it is impossible to design a similar single point fault tolerance protocol for a MPI based solution(because of its generality).

Thanks for the detailed explanation. Let me dig into the issues you listed. :-)

As a matter of fact, most MPI implementations present themselves as exclusive execution environment

Indeed ..

my 2 cents here

for us to use in a production environment, fail all and recover from checkpoint is good enough. As many other frameworks like TF seems also go in this path. Based on this, to maintain Rabit the complexity of which are mostly comes from recovery logic is really an overhead

However, there are cases that the single point recovery is useful, e.g. I do see sometimes the Spark executors are preempted but returned back within 5 mins , for this case, re-starting the whole Spark applications is less efficient than just hanging all other executors there and wait for things to be stable...

I discussed with @trivialfis regarding a similar story happened in Spark community. Originally we had Akka as the RPC call framework, which just looks like rabit here, most of charming features are about fault-tolerance, etc. but Spark didn't use it at all but pay the maintenance cost for dependency, etc. Later, Spark implements its own RPC framework with the same interface with Akka. There were 1-2 versions where Akka/Spark-RPC co-exists for the convenience of falling back until Spark-RPC fully replaced Akka.

For us, can we have a simple allreduce framework based on the same interface with Rabit which facilities the co-existence of multiple frameworks. Even when someone wants to implement a new AllReduce framework like Tiger, as long as she/he implements these interface, it can be integrated with XGB easily (it's easy to do in JVM world, not sure C++....).

Some thoughts from me:

After @tqchen's explanation I see the benefit of single worker recovery. My concern is that we don't have enough contributors with expertise here to implement and make the feature robust. I think it needs a lot of work, and even then may not be super robust due to so many edge cases in xgboost, such as trying to recover with process_type=update or with GPU training.

If we remove single worker recovery and windows support (both broken), as well as apply clang-tidy analysis, we have a chance to get the code base into something we can maintain.

I think the bottom line is that we need to make sure that the interface are compatible, and can be embedded into the distributed exec env like Dask/Spark. There is a lot of values in ability to embed an allreduce into the distributed execution env. And I see value of that in rabit beyond the single point recovery ability.

It is fine to drop in and replace with other allreduce frameworks as long as the interface is standardized around the rabit style API for cases like GPU(where NCCL makes more sense)

It's fairly easy on C++, we just need to add a backend. MPI is a no-go as mentioned by @tqchen . But something more primitive like UCX might still be a viable option, I will need to look into it in with more details. Single point recovery is a nice idea, especially in the face of growing size of data. So if the RABIT protocol can be enabled as a thin layer on top of another comm library I'm fine with it. But actually developing a comm library seems to be a bit too much for this feature.

#6112 Removes the single point recovery in XGBoost.

Merging RABIT is considered complete. Future development will continue as part of XGBoost as discussed. Thanks for everyone who participated, suggestions and guidance are invaluable. @tqchen @CodingCat @RAMitchell