Yelp/mrjob

concurrent steps on EMR clusters

coyotemarin opened this issue · 6 comments

It should be possible to launch a pooled cluster that can run more than one step concurrently, using EMR's StepConcurrencyLevel attribute.

Pooling is going to need to work a little bit differently. If step concurrency is > 1, it should still be possible to add steps to clusters in the RUNNING state, but we should check that the cluster isn't already full of running steps. We also shouldn't join a cluster with a higher step concurrency than we requested.

Multi-step jobs are going to need to submit every step after the first one at a time so that they don't attempt to run simultaneously. For this ticket, we don't need to consider the possibility of running one step on one cluster and another step on another.

Going to call the option max_concurrent_steps.

The dataproc and Hadoop runners already submit steps one at a time, so this shouldn't be too difficult to build.

Locking needs to work a little differently for clusters that allow multiple concurrent steps.

When a cluster only runs a single step at a time, we don't need to look at which steps are running, just at the cluster's state. We hold the lock for a long time (up to a minute), because it can take time for the cluster's state to flip from WAITING to RUNNING.

When a cluster allows multiple concurrent steps, we can potentially join even if the cluster is in the RUNNING state, as long as the cluster hasn't reached its limit for concurrent steps. So before joining, we have to check the number of active steps (in the PENDING or RUNNING states). Once we've submitted a step, we should immediately release the lock, because we don't have to wait for the cluster's state to flip.

Similarly, if a cluster only runs one step at a time, we can submit all our steps at once and just watch them run. Otherwise, when running a multi-step job, we have to submit each successive step after the previous one finishes.

Also, a step's action-on-failure can't be CANCEL_AND_WAIT on a cluster that runs concurrent steps; we have to use CONTINUE instead.

In theory, any cluster with AMI 5.28.0 or later can be modified to go from running one step at a time to multiple steps concurrently by calling ModifyCluster. I wonder if this is true if there are already steps with CANCEL_AND_WAIT with their action-on-failure though.

Looks like it really is possible to "train wreck" a multi-step job by submitting both steps to a cluster with concurrency level 1, and then changing the concurrency level to 2. So really, multi-step jobs should submit one step at a time to any AMI that supports concurrency.