Heteogenous CUDA Cluster
jackyko1991 opened this issue · 6 comments
I am trying to build a customized cluster with multiple CPU workers and single GPU worker.
To build this heterogenous cluster, I have used Dask SpecCluster.
On our workstation we have 16 CPU cores, 1 GPU and 128GB RAM. We want to distribute the resources evenly with the specification:
worker | nthreads | memory | GPU |
---|---|---|---|
cpu-0 | 4 | 32GB | 0 |
cpu-1 | 4 | 32GB | 0 |
cpu-2 | 4 | 32GB | 0 |
gpu-0 | 4 | 32GB | 1 |
To achieve this in dask I use the following script:
import dask
from dask.distributed import Client, Scheduler, Worker, Nanny, SpecCluster
import multiprocessing
import psutil
# gather device info
cpu_count = multiprocessing.cpu_count()
memory_count = psutil.virtual_memory().total
print("CPU count:", cpu_count)
print("System memory:",memory_count)
specs = {
"cpu":{
"scale":3,
"resources":{
}
},
"gpu":{
"scale":1,
"resources":{
"CUDA_VISIBLE_DEVICES": [0]
}
}
}
worker_count = 0
for v in specs.values():
worker_count += v["scale"]
nthreads = cpu_count//worker_count
memory_limit = int(memory_count*0.9)//worker_count # set to use 90% of the system memory to avoid crashing
print("number of workers:", worker_count)
print("threads per worker:", nthreads)
print("memory limit per worker:", round(memory_limit/(1024*1024*1024),2), "GB")
workers = {}
for k, v in specs.items():
for i in range(v["scale"]):
if "CUDA_VISIBLE_DEVICES" in v["resources"].keys():
workers["{}-{}".format(k,i)] = worker_spec(threads_per_worker=nthreads, memory_limit=memory_limit, CUDA_VISIBLE_DEVICES=v["resources"]["CUDA_VISIBLE_DEVICES"])[0]
else:
workers["{}-{}".format(k,i)] = {
"cls":Nanny,
"options":{
"nthreads": nthreads,
"memory_limit": memory_limit
}
}
workers
"""
{'cpu-0': {'cls': distributed.nanny.Nanny,
'options': {'nthreads': 4, 'memory_limit': 30317854924}},
'cpu-1': {'cls': distributed.nanny.Nanny,
'options': {'nthreads': 4, 'memory_limit': 30317854924}},
'cpu-2': {'cls': distributed.nanny.Nanny,
'options': {'nthreads': 4, 'memory_limit': 30317854924}},
'gpu-0': {'cls': distributed.nanny.Nanny,
'options': {'env': {'CUDA_VISIBLE_DEVICES': '0'},
'interface': None,
'protocol': None,
'nthreads': 4,
'data': dict,
'dashboard_address': ':8787',
'plugins': [<dask_cuda.utils.CPUAffinity at 0x7f35c8ea7880>],
'silence_logs': True,
'memory_limit': 134746021888.0,
'preload': 'dask_cuda.initialize',
'preload_argv': '--create-cuda-context'}}}
"""
scheduler = {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}}
cluster = SpecCluster(scheduler=scheduler, workers=workers)
client = Client(cluster)
client
In ordinary dask resource manage we suppose to call resources
to selectively choose the worker:
# specify the worker for the compute process
with dask.annotate(worker={'GPU': 1}):
res = dask.compute(*futures)
Can we do a samilar behavior for cuda worker?
It is expected to have 4 CPU affinity to single GPU, but turns out give me the GPU is attached to all the CPU workers. Also memory limit is not applied for the gpu worker
Can dask_cuda build this type of cluster?
dask-cuda/dask_cuda/worker_spec.py
Line 95 in 590d26a
This line seems will override user-specified per worker memory limit, should that be changed?
Dask-CUDA will probably not handle this kind of automated cluster creation. Instead, we (@jacobtomlinson ) has explored a bit around inferring hardware and auto annotating that cluster in https://github.com/jacobtomlinson/dask-agent
I also played with this idea a bit in rapidsai/cudf#11599 but have since paused these experiments.
What you have above does not seem correct -- I think you are trying to build 5 workers total: 4 CPU and 1 GPU. Is that correct ?
The Jupyter widget may be incorrectly showing a GPU for non-GPU workers. Although technically there is nothing stopping the regular workers from accessing the GPU so it's not incorrect that they can see the GPU. It might be worth setting CUDA_VISIBLE_DEVICES=""
for those workers to obscure the GPU from them, not that it will matter as the annotation will steer GPU tasks to the GPU worker.
@quasiben I have tested to utilize all CPUs only need to use all workers no matter if it is with GPU or not. What I am trying to do here is to have a single GPU worker affinity to some CPU threads so whenever calling the GPU worker it will not infer other CPU processes. This way of setup sometimes provides me with more efficient asynchronized calls of dask tasks.
So should that be 3CPU workers + 1 GPU worker?
@jacobtomlinson tested with following attempt:
CUDA_VISIBLE_DEVICES to jupyter: 2,3
ws = worker_spec(threads_per_worker=nthreads, )
ws[2]["options"]["env"]["CUDA_VISIBLE_DEVICES"] = ""
ws
"""
{2: {'cls': distributed.nanny.Nanny,
'options': {'env': {'CUDA_VISIBLE_DEVICES': ''},
'interface': None,
'protocol': None,
'nthreads': 4,
'data': dict,
'dashboard_address': ':8787',
'plugins': [<dask_cuda.utils.CPUAffinity at 0x7fb9ece55930>],
'silence_logs': True,
'memory_limit': 67592159232.0,
'preload': 'dask_cuda.initialize',
'preload_argv': '--create-cuda-context'}},
3: {'cls': distributed.nanny.Nanny,
'options': {'env': {'CUDA_VISIBLE_DEVICES': '3,2'},
'interface': None,
'protocol': None,
'nthreads': 4,
'data': dict,
'dashboard_address': ':8787',
'plugins': [<dask_cuda.utils.CPUAffinity at 0x7fb9ece55660>],
'silence_logs': True,
'memory_limit': 67592159232.0,
'preload': 'dask_cuda.initialize',
'preload_argv': '--create-cuda-context'}}}
"""
Still creates the worker bind to the GPU:
Somehow I tried in this way on a 4GPU 20 core system:
- Using environment variable
CUDA_VISIBLE_DEVICES=2,3
for notebook launching and gpu process isolation - Setup of 1 CUDA worker per GPU. For dask resource management suppose with annotation you can call two GPU for one worker, but seems not suitable for dask cuda
# Note that scale time GPU per worker should never exceed length of specified CUDA_VISIBLE_DEVICES
specs = {
"cpu":{
"scale":3,
"resources":{
}
},
"gpu":{
"scale":2,
"CUDA_VISIBLE_DEVICES": [2,3],
"resources":{
"GPU": 1 # must be 1 under cuda-dask
}
}
}
assert specs["gpu"]["scale"]*specs["gpu"]["resources"]["GPU"] <= len(specs["gpu"]["CUDA_VISIBLE_DEVICES"]), "Number of gpu workers (scale) times GPU per worker should not exceed length of CUDA_VISIBLE_DEVICES"
worker_count = 0
for v in specs.values():
worker_count += v["scale"]
nthreads = cpu_count//worker_count
memory_limit = int(memory_count*0.9)//worker_count # set to use 90% of the system memory to avoid crashing
print("number of workers:", worker_count)
print("threads per worker:", nthreads)
print("memory limit per worker:", round(memory_limit/(1024*1024*1024),2), "GB")
print("GPU workers:", specs["gpu"]["scale"])
print("GPU per worker:", specs["gpu"]["resources"]["GPU"])
"""
number of workers: 5
threads per worker: 4
memory limit per worker: 45.32 GB
GPU workers: 2
GPU per worker: 1
"""
workers = {}
for k, v in specs.items():
for i in range(v["scale"]):
if "CUDA_VISIBLE_DEVICES" in v.keys():
CUDA_VISIBLE_DEVICES = v["CUDA_VISIBLE_DEVICES"]
gpu_per_worker = v["resources"]["GPU"]
assert gpu_per_worker == 1, "gpu per worker need to be 1 for dask cuda"
ws = worker_spec(
threads_per_worker=nthreads,
CUDA_VISIBLE_DEVICES=CUDA_VISIBLE_DEVICES[gpu_per_worker*i:gpu_per_worker*(i+1)]
)
workers["{}-{}".format(k,i)] = ws[v["CUDA_VISIBLE_DEVICES"][i]]
workers["{}-{}".format(k,i)]["options"]["resources"]={"GPU":v["resources"]["GPU"]}
workers["{}-{}".format(k,i)]["options"]["memory_limit"]=memory_limit
else:
workers["{}-{}".format(k,i)] = {
"cls":Nanny,
"options":{
"nthreads": nthreads,
"memory_limit": memory_limit
}
}
scheduler = {'cls': Scheduler, 'options': {"dashboard_address": ':8787'}}
cluster = SpecCluster(scheduler=scheduler, workers=workers)
client = Client(cluster)
client
Even the workers are appear to bind with gpus but with dask annotation:
# specify the worker for the compute process
with dask.annotate(resources={'GPU': 1}):
res = dask.compute(*futures)
can properly restrict cuda process on the gpu workers only
I wouldn't suggest having more than one GPU per worker. My point was more that the screenshots you're sharing that show the GPU listed on the non-GPU workers is likely to be a UI bug.