dask/dask-kubernetes

Autoscaling removes workers in other worker groups besides default

Opened this issue · 4 comments

Describe the issue:
Per https://kubernetes.dask.org/en/latest/operator_resources.html#daskautoscaler, only the default worker pool should autoscale. But I'm observing other worker groups also getting scaled down at the end of a computation.

Minimal Complete Verifiable Example:

from dask_kubernetes.operator import KubeCluster

# Enable HTTP API
env = dict(DASK_DISTRIBUTED__SCHEDULER__HTTP__ROUTES='["distributed.http.scheduler.prometheus", "distributed.http.scheduler.info", "distributed.http.scheduler.json", "distributed.http.health", "distributed.http.proxy", "distributed.http.statics", "distributed.http.scheduler.api"]')

cluster = KubeCluster(name="foo", image="ghcr.io/dask/dask:latest", env=env)
cluster.add_worker_group(name="highmem", n_workers=2, resources={"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}}, env=env)
cluster.adapt(minimum=0, maximum=1000)

import distributed
client = distributed.Client(cluster)

import time
def wait(i):
    time.sleep(1)
    return i
futures = [client.submit(wait, i) for i in range(10000)]

Obviously don't see the exact same behavior every time but I consistently see one or two of the highmem workers get retired by the scheduler:

[dask-kubernetes-operator-74cc5c8855-zc7nm] [2024-10-25 19:27:26,748] kopf.objects  [INFO    ] [default/foo-default] Workers to close: [..., 'foo-highmem-worker-ca6c5d33ea', ...]

@jacobtomlinson happy to help debug further but would need some pointers into where the decision of which workers to close is being made
Environment:

  • Dask version: 2024.9.1 for the above example but I saw the same on 2024.10.0
  • Python version: 3.10.12
  • Operating System: Linux

It looks like there is something funny going on with the logic.

When autoscaling we first get a DaskWorkerGroup object that represents the default group.

worker_group = await DaskWorkerGroup.get(
f"{spec['cluster']}-default", namespace=namespace
)

Then we ask the scheduler which workers to retire.

desired_workers = await get_desired_workers(
scheduler_service_name=f"{spec['cluster']}-scheduler",
namespace=namespace,
)

Then we scale the default worker group down to that number of workers.

await worker_group.scale(desired_workers)

My guess here is that the scheduler wants to remove the highmem workers, but we aren't correctly filtering those out when calculating where to scale to.

Can I check that you're definitely seeing those workers being removed? Or is it just saying it wants to remove them but never actually removed them?

For example if you have two highmem group workers and five default group workers. In a situation where the scheduler wants to remove the two highmem ones I think it would probably log that it wants to remove them, but then actually remove two of the default group workers. Which is a bug we definitely need to sort out.

@jacobtomlinson I can definitely confirm that those workers are gone, the reason I started digging into this in the first place was that my highmem worker group kept getting 💥'd. In the example I posted, one of the two highmem workers remains and the other does get killed (but in a larger cluster they will all get killed eventually). The logs from the worker that goes down are

2024-10-25T19:26:32.113873080Z 2024-10-25 19:26:32,113 - distributed.nanny - INFO - Start Nanny at: 'tcp://10.24.4.223:41261'
2024-10-25T19:26:33.634302690Z 2024-10-25 19:26:33,633 - distributed.worker - INFO - Start worker at: tcp://10.24.4.223:37205
2024-10-25T19:26:33.634354188Z 2024-10-25 19:26:33,634 - distributed.worker - INFO - Listening to: tcp://10.24.4.223:37205
2024-10-25T19:26:33.634362236Z 2024-10-25 19:26:33,634 - distributed.worker - INFO - Worker name: foo-highmem-worker-ca6c5d33ea
2024-10-25T19:26:33.634388998Z 2024-10-25 19:26:33,634 - distributed.worker - INFO - dashboard at: 10.24.4.223:8788
2024-10-25T19:26:33.634425753Z 2024-10-25 19:26:33,634 - distributed.worker - INFO - Waiting to connect to: tcp://foo-scheduler.default.svc.cluster.local:8786
2024-10-25T19:26:33.634439967Z 2024-10-25 19:26:33,634 - distributed.worker - INFO - -------------------------------------------------
2024-10-25T19:26:33.634668676Z 2024-10-25 19:26:33,634 - distributed.worker - INFO - Threads: 16
2024-10-25T19:26:33.634693077Z 2024-10-25 19:26:33,634 - distributed.worker - INFO - Memory: 62.80 GiB
2024-10-25T19:26:33.634704451Z 2024-10-25 19:26:33,634 - distributed.worker - INFO - Local Directory: /tmp/dask-scratch-space/worker-qx4hmixf
2024-10-25T19:26:33.634854505Z 2024-10-25 19:26:33,634 - distributed.worker - INFO - -------------------------------------------------
2024-10-25T19:26:35.388612135Z 2024-10-25 19:26:35,388 - distributed.worker - INFO - Starting Worker plugin shuffle
2024-10-25T19:26:35.389244830Z 2024-10-25 19:26:35,388 - distributed.worker - INFO - Registered to: tcp://foo-scheduler.default.svc.cluster.local:8786
2024-10-25T19:26:35.389267232Z 2024-10-25 19:26:35,389 - distributed.worker - INFO - -------------------------------------------------
2024-10-25T19:26:35.390102584Z 2024-10-25 19:26:35,389 - distributed.core - INFO - Starting established connection to tcp://foo-scheduler.default.svc.cluster.local:8786
INFO 2024-10-25T19:27:26Z Stopping container worker
2024-10-25T19:27:26.839548644Z 2024-10-25 19:27:26,839 - distributed._signals - INFO - Received signal SIGTERM (15)
2024-10-25T19:27:26.839847574Z 2024-10-25 19:27:26,839 - distributed.nanny - INFO - Closing Nanny at 'tcp://10.24.4.223:41261'. Reason: signal-15
2024-10-25T19:27:26.840066612Z 2024-10-25 19:27:26,839 - distributed.nanny - INFO - Nanny asking worker to close. Reason: signal-15
2024-10-25T19:27:26.864512187Z 2024-10-25 19:27:26,864 - distributed.nanny - INFO - Worker process 28 was killed by signal 15
2024-10-25T19:27:26.867337341Z 2024-10-25 19:27:26,867 - distributed.dask_worker - INFO - End worker

The "Scheduler was unaware of this worker" thing I have seen before and posted about in dask/distributed#6961, @gjoseph92's theory (as I understand it) was that this was a race condition around closing and just represents a noisy unnecessary log, the time does line up with when the scheduler was considering closing it so that does seem probably true here as well...but if there's any kind of more verbose logging/tracing that would shed light on the specific decision path, let me know and I will re-run the experiment

Yeah I think that makes sense. The scheduler tells the worker to close. We need to update the retire workers to exclude ones that aren't in the default group.

For now I am working around this with

class WorkerNameFilterPlugin(SchedulerPlugin):
    """Workaround for https://github.com/dask/dask-kubernetes/issues/914."""

    def __init__(self):
        self.name = "worker-name-filter"
        self.idempotent = True

    def valid_workers_downscaling(self, scheduler, workers):
        return [worker for worker in workers if "default" in worker.name]

which does seem to prevent the highmem workers from being downscaled.
EDIT: fixed a typo