kubeflow/training-operator

Prevent scheduling deadlocks

jlewi opened this issue ยท 35 comments

jlewi commented

Suppose a user submits M distributed jobs each requiring with N workers each requiring 1 GPU.
Suppose the cluster has exactly N GPUs. So there is enough capacity in the cluster to run exactly 1 job at a time.

But suppose K8s ends up scheduling 1 worker from each job. Then each training job is potentially stalled because it won't start until all N workers are available.

How can we prevent such deadlocks from occurring?

This is clearly a resource scheduling issue so I think its something K8s should handle; we'd like a way to tell K8s that a group of resources need to be co-scheduled. If it all possible I think we should try to avoid building our own queuing system to handle this.

@foxish @vishh @kow3ns do you know if K8s has a way to solve this or is planning on adding a mechanism?

This is a great discussion item - and we have the exact same problem with Spark as well. I think this can be somewhat helped by priority and preemption for pods - but the underlying issue where you want gang scheduling can only be solved by a real queueing & batching system.

We proposed one in this doc which was later turned into an incubator project - https://github.com/kubernetes-incubator/kube-arbitrator which is currently being worked on. The big data SIG has a lot of interest in this space for 2018.

cc/ @davidopp @k82cn @erikerlandson

/xref kubernetes/enhancements#269

It's nowhere close to "done" yet, but should get closer to that hopefully over the next year.

jlewi commented

@foxish So is the way we would use kube-arbitrator be that the TfJob controller submits a QueueJob that specifies all the resources that we want to schedule together (i.e. all the job controllers)?

Hmm.. The model I had in mind was - we wrap a TfJob in a QueueJob which has requests for the aggregated resources needed etc and will not create the actual TfJob until the requirements are satisfied.

jlewi commented

Wouldn't that require the caller of the TfJob to know what resources a TfJob needs? The actually resources should be an internal detail computed by the controller.

Hmm.. In the Spark case, it's expected that the user submitting the job will have some notion of the resources required/requested. I guess one can also envision a system like you mentioned in #165 (comment), but IMO it would be difficult for a user to reason about their jobs if they are completely abstracted from the resources requested.

jlewi commented

The user does specify resources but this is only part of the total foot print for a job. TfJob might launch additional agents/processes such as running TensorBoard, logging side cars etc.. TfJob might supply defaults if the user doesn't supply defaults. So the user may not have the total picture. In this case the total resource foot print is some function of the resources specified by the user in the TfJob spec. So I think it makes sense for the TfJob controller to figure out the total resources required and submit the appropriate request.

As @foxish mentioned, coping with a set of contending jobs, where any job may require some minimum number of resources to start, isn't trivial. If this situation gets combined with priorities and/or QOS, it is possible for jobs to starve - for example some other jobs with higher priority always take the necessary resources, and other jobs never run. A situation where job quota is less than the configured resources can result in similar starvation.

@jlewi your point about the controller making a "final" computation of total resources is interesting. My experience with HTCondor is that you also can run into UX problems: for example the more knobs and behaviors a system has, the more frequently you are likely to have customers complaining that their jobs aren't running, either because they mis-configured something, or they configured it "correctly" but it is difficult to reason about the subtleties of the system's actual behavior. The point I'm circling around is that having the controller make modifications to resource requirements may result in use confusion. Possibly allocate "overhead" resources from a centralized pool? Or at any rate make sure the system provides good quality error feedback to minimize confusion.

jlewi commented

Good points.

Another case worth considering is when the resource usage varies over time.

In some settings training an ML model can consist of interleaved phases of training and evaluation. In the simplest case, we might train a model, and then evaluate the model. Training and evaluation might have very different resource foot prints.

When the user submits a TfJob they might specify how many resources to use for training and evaluation respectively. The TfJob controller than creates a bunch of resources (e.g. N Job controllers) to run training. When training is done, it would create a bunch of resources (e.g. M job controllers) for evaluation.

So in this case, I think we want the TfJob to submit a QueueJob for each phase training & evaluation. Queuing TfJob doesn't really make sense because the amount of resources varies depending on the phase.

bhack commented

@jlewi Mainly we can have train, validation and testing phase. Probably validation and testing could be similar in term of resources request but validation could be needed to run more frequently also for evaluate early stopping.

Some of these evolving-workload aspects seem similar to issues that are being encountered by Airflow, where a DAG workflow can fan out to many parallel tasks, then back down to some smaller number, and so on.

kube-arbitrator may be overkill if all you need is gang scheduling. But regardless, the underlying issues are the same however it's implemented.

I think there are basically three approaches. (I use "gang of pods" and "job" interchangeably here.)

  1. Change the API that the scheduler uses to request the API server to bind pods to nodes, so that it can request a set of bindings with the semantics of "bind all of these or none of these." This is a nontrivial change, as today bindings are requested using the "binding" subresource of pod, so there isn't a natural way to extend it to bind multiple pods. Probably we'd need to introduce some kind of wrapper API object that contains the objects/operations that you want committed as a transaction. (This would also be used for other scenarios in the future where we want to request multiple objects/operations to be committed as a transaction; gang scheduling would be the first use case.) When the API server receives this object, it would use the etcd v3 multi-object transaction mechanism to try to commit the wrapped objects as a transaction, or fail the request back to the requester if it can't. I guarantee that what I described here would be highly controversial and will involve many months of discussion with sig-architecture, sig-api-machinery, and others before any implementation could begin. Of course we'd also need to change the logic in the scheduler so that it processes multiple pods as a group (which isn't necessarily easy, because the scheduler currently reasons about pods, not sets like Job or ReplicaSet or whatever, and does so one pod at a time, so it doesn't have any obvious way to know what are all the members of the gang of pods that are supposed to be co-scheduled).

  2. Instead of introducing a transaction concept, we'd have the scheduler continue to bind one pod at a time, as today. But it would try to get all of the pods of one gang scheduled before trying to get any pods of any other gang scheduled. Pods from a gang that is not yet fully scheduled would "squat" on the resources until the rest of the gang can be scheduled. We'd add a mechanism so that the kubelet doesn't actually start the pods until the entire group is scheduled (unless somehow the application can detect when all the pods have been scheduled, in which case that part can be done at the application level.) So this just requires grouping and prioritizing inside the scheduler (last sentence of the description of (1) above), not any API changes. In a system with just a single scheduler, this approach would probably work fine -- once the scheduler decides that all the pods of a gang can fit, there are very few reasons why binding them one at a time should fail. If something happens and the scheduler has been waiting to get the last pod(s) of a gang scheduled for a long time, it could unbind the pods and try a different gang that it thinks would fit given the current cluster configuration.

  3. The simplest thing to do is to treat gang scheduling as admission control rather than scheduling. (I think this is the way the kube-arbitrator design suggested, but it has been a while so I might be mis-remembering, and I don't know what was actually implemented.) It's similar to (2) but instead of doing the queueing in the scheduler, you do it in something like an admission controller that holds jobs and admits a job when the aggregate resources in the cluster seem sufficient for the job. If it guesses wrong, some pods of the job will go pending until resources are available; it can decide to kill them and try again later, or wait for them to eventually schedule. It's less precise than (2) because it's just looking at whether aggregate free resources in the cluster are sufficient for the job, rather than knowing for sure that a pod will be able to schedule. So it may think a job can fit but it actually can't due to free resource fragmentation, node affinity constraints, etc. But it's nice because it requires no changes at all to the scheduler (e.g. scheduler does not need to know about gangs, as it does in (1) and (2) -- only the gang admission controller needs to). And technically you could have it do a scheduling simulation to get a more accurate idea of whether the pods would schedule (e.g. take into account free resource fragmentation, node affinity, etc.), though that starts to get a little complicated.

There are lots of other aspects of gang scheduling I haven't described here and that have been alluded to in previous messages in this issue (how to prioritize queued jobs, whether to schedule a job at a time or a DAG of jobs, etc.)

If kube-arbitrator is too heavyweight, I'd recommend doing something like (3) (which is in essence a simplified version of kube-arbitrator), as it can be done without any changes to core Kubernetes, the scheduler, etc.

cc/ @bsalamat

jlewi commented

Thanks @davidopp; option 3 makes the most sense to me. I think we could implement a version of that in the TfJob controller if need be.

k82cn commented

As davidopp@ said, option 3 is simplified version of kube-arbitrator; and it's also our next target :). Overall, I'd like to make kube-arbitrator pluggable, and delegate specific requirement to different policy (or plugin).

@jlewi , what's your time line for this feature? @foxish , do you think we can handle this gang-scheduling requirement together with Spark's ?

While option 3 may address this problem, as I see more and more use-cases, I feel scheduler must implement a mechanism similar to option 2. A potential mechanism is to let scheduler do its usual work to find a node for a pod. It then assumes a pod and updates its state of the cluster, including available resources on nodes, etc., but it does not bind the assumed pods automatically. It instead waits for a bind signal. The bind signal can be derived from various specifications of a pod. For example, if a pod belongs to a gang, the bind signal comes only after all the gang members are assumed. If the pod needs volume binding, the bind signal comes once all the volumes are bound. If the pod needs GPU binding, the signal comes in once GPUs are bound. Any of the functions/components that send scheduler a bind signal for a pod, may send an error as well. In case of error the pod is not bound and gets un-assumed.

This is of course a high level idea at this point and many details need to be addressed. For example, a pod may need multiple components to be notified when it is assumed and it may be bound only if all of those components send a bind signal. If any of them fail, the pod must be unassumed and those components may need to perform some cleanup.

jlewi commented

@k82cn We don't have a specific ETA; lack of gang scheduling hasn't been a problem; we'll see how far we can get before it becomes an issue.

My inclination though is not to develop our own custom solution. So I'd be inclined to try kube-arbitrator first.

k82cn commented

@jlewi , that's great ; let kube-arbitrator try to handle this :). Keep you posted if any update.

jlewi commented

If someone wants to take a stab at adding kube-arbitrator support to TfJob controller here's a rough idea of what I think needs to happen

  • Add an option to TfJob controller to indicate we want to use kube-arbitrator and additional config
  • Add a method to TfReplicaSetInterface that will add the resources needed by a replica to QueueJob.
  • Update the main control loop to submit and monitor the QueueJob.

Hi @k82cn , I'm interested in this issue because we are working on the same problem. Our approach is implementing the gang scheduling in a custom scheduler. The brief description can be found in my talk at kubecon '17: https://schd.ws/hosted_files/kccncna17/95/kc_cnc_na17_mitake.pdf (page 6 - 9).

Can I try your kube-arbitrator based solution, or is it still under construction?

k82cn commented

@mitake , we're still working on that; but I think you can have a try. If any issue here, we'll handle it ASAP.

@k82cn thanks. But it seems that tf_operator simply creates jobs for TF workers and PSes (https://github.com/tensorflow/k8s/blob/master/pkg/trainer/replicas.go#L258). How k8s can determine these jobs belong to a single learning task and schedule them at once?

A kube job object can describe multiple pods that start up and run to completion. In that respect a job can represent a single TF training (if you embue it with gang scheduling). If we use the existing kube object Job as a unit of gang scheduling, it might hit a lot of use cases. People with traditional Job objects could use it. But we might architect newer things like kubeflow CRDs or Airflow objects "on top of" Job as well, just with additional structure.

k82cn commented

@erikerlandson , in kube-arbitrator, we introduced QueueJob (CRD) to include all related info for scheduling. For now, it's not bound to Job; but it makes sense to do that (similar with CronJob) when merging to upstream.

@jlewi Thanks for bringing up this great discussion! Sorry don't notice this discussion before I proposed kube-scheduler, please involve me in about this in the future, let's push this forward together!

Then each training job is potentially stalled because it won't start until all N workers are available.

For TensorFlow data parallel training, there are two approaches, synchronous and asynchronous:

  • Synchronous: is similar as you described, the PS will update the parameters until all computation on every workers is done, so all workers must be ready then the training will start, so we need gang scheduling for this case.
  • Asynchronous: AFAIK, there is no necessary to wait all workers ready (but we still need chief worker to be ready), the PS update parameters asynchronously, TensorFlow has considered about fault tolerance, workers can re-compute any model parameters (by mini-batch) and apply them to PS asynchronously.

So maybe we need some more use-cases from data scientist about gang scheduling.

jlewi commented

@ScorpioCPH I'm not sure its worth it to treat asynchronous any differently from synchronous. At best that seems like an optimization to wait for the future.

Do you think we should try to add kube-arbitrator support to TfJob controller?

I'm not sure its worth it to treat asynchronous any differently from synchronous.

Maybe we can do a deep survey about this, if asynchronous model is used commonly, this is not a high-priority issue.

At best that seems like an optimization to wait for the future.

If synchronous is used commonly, i think this is a high-priority issue need to be addressed, because in some corner case the training will get stuck.

add kube-arbitrator support to TfJob controller

Sorry, i'm not very familiar with kube-arbitrator, it seems like it still under construction.
Maybe we can take a stab at this (I prefer starting this after refactor).

k82cn commented

I'm not sure its worth it to treat asynchronous any differently from synchronous.

Maybe we can do a deep survey about this, if asynchronous model is used commonly, this is not a high-priority issue.

Agree with jlewi@ ; in davidopp@'s design doc, it's handled by min request for both 'async' and 'sync': if 'total desired' == 'min request', it is "sync", 'total desired' > 'min request' is "async" according to your description; the controller can update number of task.

add kube-arbitrator support to TfJob controller

Sorry, i'm not very familiar with kube-arbitrator, it seems like it still under construction.
Maybe we can take a stab at this (I prefer starting this after refactor).

Honestly, it's better to contribute to kube-arbitrator instead of starting a new one :). There're lots of discussion this year to make it "extendable" and meet the requirement from batch workload. Anyway, if you just want to handle one or two special cases, maybe a personal repo is better :).

@ScorpioCPH

If synchronous is used commonly, i think this is a high-priority issue need to be addressed, because in some corner case the training will get stuck.

I don't think the problem is a corner case thing. It is really easy to produce the problem: generate some distributed learning task (e.g. 100) on a relatively small cluster (e.g. less than 10 nodes) is enough even for CPU based training. And such a situation would be common for experiments of DL research and engineering (especially hyperparameter tuning).

jlewi commented

If the training is stuck this will be surfaced to the user in the TFJob status in terms of number of replicas running, waiting etc... And the user can deal with it (e.g. increase cluster size if its an issue of not being replicas).

I think the pernicious/hard to deal with cases are going to be situations where "manual scheduling" doesn't work because there are so many jobs in the cluster that a user can't keep up with manual intervention.

In these cases we will need to rely on K8s to get scheduling right, so I think its much better if we align with kube-arbitrator per @k82cn comment.

TFJob is going to move to creating the pods directly see #325.

So we could start thinking about how to make that fit with kube-arbitrator.

k82cn commented

Sure, it's exciting to run TF on kube-arbitrator :).

Should we close the issue? We already support using kube-batch and volcano for gang scheduling.

Yes. This can be solved using gang scheduling.

/priority p2

stale commented

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.