Parsl/parsl

Working in pre-emption intensive environments

drewoldag opened this issue · 6 comments

Is your feature request related to a problem? Please describe.
The HPC that I have access uses a "condo model" and Slurm such that I can request resources that are not currently in use, and if the owner of those resources requests them while I'm still using them, I will be preempted on short notice.

We would like to be able to instruct Parsl to scale in the block that was used to run a task, and then scale out another block for the next task in line. What we have observed is that a block will be scaled out and multiple tasks will be executed sequentially on the block until 1) there are no more tasks in the queue or 2) the walltime for the provider is exceeded.

It seems like there must be a way to configure the system to do this, but I have been unable to find it in the documentation.

Describe the solution you'd like
Again, I would guess that this functionality already exists, but I'm assuming I've just missed it. Ideally there's a parameter in either the executor or provider that would allow me to instruct parsl to spin down the resources requested after a task completes.

Additional context
Here's a snippet of my current configuration file. Here we're expecting at most 10 blocks to be provisioned, with 1 node/block and 1 worker/node.

HighThroughputExecutor(
    # This executor is used for reprojecting sharded WorkUnits
    label="sharded_reproject",
    max_workers=1,
    provider=SlurmProvider(
        partition="ckpt-g2",
        account="astro",
        min_blocks=0,
        max_blocks=10,
        init_blocks=0,
        parallelism=1,
        nodes_per_block=1,
        cores_per_node=8,
        mem_per_node=32,
        exclusive=False,
        walltime="01:00:00",
        worker_init="",
    ),
),

Hi.

I don't immediately see how your second paragraph follows on from the first, so I'll structure my comments here not making that assumption.

First, I'm interested what you're trying to achieve by having your batch jobs not last very long - they're going to get preempted anyway(?) so is there some other scheduling going on here that makes short batch jobs more desirable? Can you describe that a bit more? (eg. as an example question, what benefit do you get from 2 tasks in 2 blocks vs 2 tasks in 1 block?)

The block scaling code is really distinct from the high throughput executor task execution code: that's deliberate because the pilot jobs in the batch system really are meant to be distinct from individual tasks - so this isn't a thing Parsl features have been driving towards (and probably not a thing we're super interested in pursuing without some more detailed justification)

You could hack the process worker pool (process_worker_pool.py) to shut down after one task, if you want to try out that behaviour - Parsl submit-side will notice and scale up another pool to do more work (eventually). I think @ryanchard has had some experience doing that in the past - but actually I think what he wanted turned out to be more like workers that only last for a short time period and drain after that (without any particular concern for the task structure inside that short time period), and for that he ended up using the htex worker drain time options (see drain_period in https://parsl.readthedocs.io/en/latest/stubs/parsl.executors.HighThroughputExecutor.html)

Hello Ben,

thanks for the response

First, I'm interested what you're trying to achieve by having your batch jobs not last very long - they're going to get preempted anyway(?) so is there some other scheduling going on here that makes short batch jobs more desirable?

Yes, you are spot on. The issue is not the condo model, the issue is that we would like to utilize what is called a checkpoint queue. Checkpoint queue allows access to all of the resources at the cluster currently not being utilized. In effect we are granted access to resources above what we were allocated as reserved resources. However, should the owner of the resources we were given via the checkpoint queue request them back - our jobs are terminated and the resources are re-allocated ("preempted") to them.
I believe the idea is to allow many users of the cluster access to a large amount of resources in parallel, but comes at a cost that the jobs they may want to run on them need to be a) short, b) be able to save their state and fail gracefully on a moment's notice.

We are able to achieve a high level of parallelism, with reasonably short individual jobs (~ten thousands on the order of 5-7 minutes) but some of our jobs take 20 minutes (thousands), some take a few hours (few thousand). The duration of a job is the function of the data that the job takes and unfortunately can not be predicted in advance (not which data the job takes, but whether the data will take a long time to process). The time-to-preemption depends on the utilization of the cluster at any given time, so sometimes we are able to retain a worker for a few hours and sometimes at best 10 minutes.

We only have access to few of more permanent resources, so, if we were to configure an executor to only use those resources, they represent a bottleneck in terms of throughput for us. We believe we would benefit from utilizing the checkpoint queue to scale out to deal with majority of our jobs.

The block scaling code is really distinct from the high throughput executor task execution code: that's deliberate because the pilot jobs in the batch system really are meant to be distinct from individual tasks [...]

Separating resource acquisition and retention from execution makes sense and generally works well for us, especially in environments where usage is credit-based for example. This is what I believe is happening to us on the resources we are currently using:

  • we get a lot of blocks of the checkpoint queue with HighThroughputExecutor
  • they execute a task or two and then get preempted, failed task get scheduled for a retry
  • task is paired with an executor and if it's freshly allocated "all is well", but when it's an pre-existing resource the task fails when the node gets preempted again because the executor has occupied it for a while already and the owner requested it back.

We can put a large retry number on our tasks, but then we find that often both the shorter and longer running tasks just repeatedly keep failing and blocking the other tasks in the queue. We thought initially, if we do not increase the retry counter when preemption happens, maybe we can keep the retry counter low, so when a task fails let's say 2 times, we know it's just a bad task or a task not fit for the checkpoint queue. We can't seem to figure out how to not increase the retry counter in case of a preemption though.

Ultimately, we end up with a long queue of retry jobs, some of which are long jobs that could not finish on checkpoint queue anyhow, some of which will always fail because data is bad, and some that just keep getting preempted over and over again until they "luck out". And maybe that's fine, but we figured that all the tasks that were retried, only to be preempted or legitimately fail again, are just wasted resources and it does make our book-keeping and log management rather cumbersome.

We were looking for a better way forward, something that may execute optimistically for the natural duration of the task itself and if it gets preempted, it gets retried only once or twice. All the other failures are considered for a new processing campaign on a different queue, one that is more permanent, or as a legitimate failure due to poor data.

I may have butchered a number of nomenclature related things in this section, I apologize, hopefully what I meant is clear.

but actually I think what he wanted turned out to be more like workers that only last for a short time period and drain after that (without any particular concern for the task structure inside that short time period)

I'm trying to read into the advice you had in your reply and this sounds like something we could also do? But I'm not an Parsl expert (yet) so I just want to clarify something that is confusing me:

  • Is draining an executor the same as reaching walltime for a provider?
  • And what is "maximum walltime of batch jobs"?

If I set drain period to, let's say, ~5 minutes and my jobs are 7 minutes, then I can expect Parsl to schedule 1 job per executor block, because by the time the job finishes the block was already "marked to be drained". The block would go away after the tasks has finished and a new one would be launched depending on the queue and scaling configuration? Or would the executor interrupt the task and drain at 5 minutes + some overhead it takes Parsl to communicate that to the executor?
If my job takes (some significant amount) more than 7 minutes, let's say 14 minutes, the executor would force the task to stop, drain, and replace itself with a new executor block based on the max_blocks and parallelism values? Or would the block continue executing until that job completes? I think this is the same question, just the other way around.

I don't think we care for "the task structure inside that short time period"? We have 3 consecutive tasks that's true, but they just have to execute in order to produce expected artifacts for subsequent tasks, each "batch" of these jobs is basically embarrassingly parallel wrt. to other tasks in the batch so we could launch the three tasks consecutively, as separate workflows, ourselves?

1. Terminology

job = block = batch job = the unit of work your cluster scheduler runs and is preempting

task = parsl app invocation = the unit of work that Parsl core schedules, the thing you declare with a @python_app decorator

2. Likelihood of pre-emption

Say a batch job has just finished executing a task and is ready for another task (that task will take, unknown to you, 7 minutes).

You've got a choice: you can either cancel this batch job and start a new one, that you hope will run for long enough (7 minutes).
Or you can keep this existing batch job alive and hope it continues to run for long enough (7 minutes more)

You're writing as if you think the second option is more likely to be true - that a batch job that has been running for longer is more likely to be pre-empted in the next 7 minutes and a fresh batch job is less likely to be pre-empted in the next 7 minutes. Its not clear to me that that is true - maybe preemption is more like a Poisson process? (you could probably get info out of the parsl monitoring database about this...)

This is distinct from the more obvious "as a job runs for longer, its more likely to get pre-empted in its lifetime"

If there isn't a significant advantage to starting a new batch job, then all the stuff about trying to start new batch jobs in preference to using old ones is irrelevant - my gut says there isn't an advantage, but I don't have data or any formal theory about it, so don't believe me too hard.

3. What info do you have about tasks and jobs?

There isn't much info available for individual tasks and jobs: you don't know how long a batch job will run because it's subject to arbitrary preemption, and you know only a big distribution of task lengths.

But you do get to know this, approx 1 bit of information per task, if a task fails:

  • A task took so long that it was pre-empted

This might be because it was a "short" task but ran in a batch job that was just being pre-empted, or
(I claim much more likely) it is because it is a "long" task that was so long that it spilled over a pre-emption.

So you get a low resolution signal that a task is "long", after 1 try.

4. Detecting pre-emption

A pre-empted batch job is going to look (I think) pretty much like any other batch job in any other environment that has run out of walltime. The fact that the walltime is unknown ahead of time isn't relevant here (I claim).

In this case, tasks that were running in that batch job will fail with either of these two exceptions: ManagerLost or WorkerLost.

These exceptions can also occur in other situations where the worker pool is killed by the OS: for example, if your tasks allocated far too much memory. So you need to be aware of that, if your applications might cause that.

You can observe these failures inside parsl's retry mechanism using a retry handler: a plugin you configure that can observe failures and decide how to handle retries.

5. Retry policies on task failure

Here's an example retry handler policy and configuration that you could implement:

i) if a task fails due to not-manager-error, not-worker-lost, then treat this as a definitive failure and do not retry the task. Let the failure propagate to your workflow script as if there are no retries enabled.

ii) if a task fails due to manager error or worker lost, treat this as a "long" task that was pre-empted. Because it is probably "long", it is probably not worth retrying in a pre-emptible worker. But, the Parsl retry handler can re-write the task description to send the task to a second Parsl executor that is configured with longer resources more appropriate for long tasks. These tasks will no longer block other fresh tasks in the pre-emptible executor.

iii) if ii) has already happened and the task still fails, you could retry it a couple of times on the big executor or fail immediately.

6. Other not implemented and harder things

If you do want to retry inside the pre-emptible queue, @ryanchard is leading an effort to get #3323 implemented in production - if that happens, you can have the retry handler lower the priority of tasks on retry so that they do not block newer tasks. This won't make retried tasks more likely to succeed, but will move them out of the way of the first attempts at other tasks.

If you have parsl monitoring turned on, you will have some rough information about how long a task ran before it was apparently pre-empted. Maybe there is fancy stuff you can do here (give tasks that didn't run for long a higher priority on retry...) - I don't really know if there's anything useful here but mentioning it anyway.

Thanks Ben,

you're being very helpful.

You're writing as if you think the second option is more likely to be true - that a batch job that has been running for longer is more likely to be pre-empted in the next 7 minutes and a fresh batch job is less likely to be pre-empted in the next 7 minutes. Its not clear to me that that is true - maybe preemption is more like a Poisson process? (you could probably get info out of the parsl monitoring database about this...)

To quickly answer this - we believe we are in the second scenario because we can observe it when we run. This of course depends on when I submit my jobs and how many of them I submit. The more jobs I submit the more resources I take and the more likely someone will ask for some part of them back, if I run on the weekend, when nobody is working and the dynamic nature of the queue is less pronounced, allocate resources in moderation and get a little bit lucky, I notice that many of my workers are able to survive for several hours. Waiting time paradoxes and biased sampling issues aside, ultimately I think the statement is correct because I observe it.

While probably chances of preemption on 7 compared to minute 14 are not greatly different I do not see many workers surviving past 30 minutes to an hour. So, I would like to remove the workers that are finishing a job around the time that is the pre-emption expectation value, but leave them running optimistically if they break that limit (or something to that effect).


I would like to try some of your suggestions to see if they help. I think lowering the retry priority would be helpful to just keep the workflow running along more smoothly instead of getting brigaded by a few dozen tasks that have no chance of finishing. I think that's not that hard to do at all according to the docs.

Re: the second part:

But, the Parsl retry handler can re-write the task description to send the task to a second Parsl executor that is configured with longer resources more appropriate for long tasks. These tasks will no longer block other fresh tasks in the pre-emptible executor.

I take it you want me to edit the value of the resource_specification on the TaskRecord and then hopefully TypedDict means it's mutable in place?
https://github.com/Parsl/parsl/blob/master/parsl/dataflow/taskrecord.py#L90

And ultimately, re: draining time questions - is setting that to a low value going to kill my task and the block when it elapses, or just mark the block to be removed as soon as it's possible without stopping the task?

I take it you want me to edit the value of the resource_specification on the TaskRecord and then hopefully TypedDict means it's mutable in place?

Yes, the task record:

If you want to change the parsl executor (which you can do now) then no, not resource_specification, but the executor entry:

executor: str

If you want to work with htex task priorities, then yes task record, and yes something in resource_specification, but also for this option, you will need to coordinate with @ryanchard to get an appropriate Parsl development fork for this -- that facility isn't merged into master (but I hope it will be ... soon... HINT HINT RYAN)

Editing the task record, and using the task record in general, wasn't originally something API-facing - so it's not really documented for users to edit. but this is a usecase I would like people to push on... because different forms of "I want to do interesting things for retry" keep coming up.

The way that draining works now:

  • draining is intended for batch jobs where you know the wall time ahead of time (not your case, but this will help you understand the idea)

  • draining is intended for a workload where you know the expected time of your tasks and the expected time is pretty consistent (also not your case)

  • draining is configured with this parameter:

drain_period : int
        The number of seconds after start when workers will begin to drain
        and then exit. Set this to a time that is slightly less than the
        maximum walltime of batch jobs to avoid killing tasks while they
        execute. For example, you could set this to the walltime minus a grace
        period for the batch job to start the workers, minus the expected
        maximum length of an individual task.
  • After a batch job (or more specifically the worker pool inside the batch job) has been running for this many seconds: it will enter drain mode.

When entering drain mode, no more tasks will be sent to the worker pool (the code running inside a batch job). The worker pool will continue to execute any tasks it already has.

When all tasks are completed on a worker in drain mode, that worker will exit.

So in your case, I think you could set drain mode to eg 1 minute (or a few minutes) and that will give this behaviour:

For the first minute, the worker pool will process tasks as normal.
After the first minute, the worker pool will stop receiving new tasks.
Any tasks that the worker pool is still running will continue to run until:
i) the tasks are all complete (then the worker pool will exit)
or
ii) the workers are killed by eviction (essentially the same as wall time expiry) - in which case you'll get a WorkerLost/ManagerLost exception.

The limited documentation implies that you should set your drain period to be a bit shorter than the configured walltime. That's because the documentation is targeted at a known walltime/known tight task length setup, which is not your situation. In my suggestion above, I am suggesting that you set it low (at a few minutes level) even though your experienced walltime might be very long.

I think that means that when you have short tasks, the batch jobs will end soon after that one minute, but if a batch job is running a long task it will keep going until the batch system evicts it. I think this is roughly what you are asking for.

Ok, wonderful, let me try some of this and I'll report back. I'll try the executors and drain now and if it works well enough maybe Ryan's poor inbox can have a rest :D

Thanks for the help, much appreciated.