rapidsai/dask-cuda

Problem in LocalCUDACluster creating worker

MRTater opened this issue · 6 comments

When I tried to create LocalCUDACluster(CUDA_VISIBLE_DEVICES="1,2"), I set print(f'num of gpus: {torch.cuda.device_count()}, now is running on: {torch.cuda.current_device()}') inside a custom deserializer and the output is

num of gpus: 2, now is running on: 0
num of gpus: 2, now is running on: 0

Deserializer should be called by a worker and therefore inherit its environment variables, which means the worker's environment is CUDA_VISIBLE_DEVICES="1,2". I also tried to print the number of GPUs inside the function that I submit to workers, the result is the same.

But according to the document "This assigns a different CUDA_VISIBLE_DEVICES environment variable to each Dask worker process.", it means that only one GPU is visible to each worker in this case, right? CUDA_VISIBLE_DEVICES='1' for worker 1 and CUDA_VISIBLE_DEVICES='2' for worker 2. I am not sure if this is a bug or a feature.

dask-cuda==23.4.0

That is all correct. When you specify CUDA_VISIBLE_DEVICES=N to a process, where N is the GPU index, that GPU is always seen by that process as GPU 0, this is what allows all processes to work without having to specify a device for each call. You can't see the GPU's "real index" when relying on CUDA_VISIBLE_DEVICES, which is what Dask-CUDA does, therefore you'll always see GPU 0.

But my problem is not about GPU 0, but torch.cuda.device_count(). This line of code shows that 2 GPUs are visible to the worker process, which does not make much sense to me. Shouldn't each worker process can only see 1 GPU?

I don't know much how PyTorch is used in conjunction with Dask, but it apparently relies on trying to first evaluate CUDA_VISIBLE_DEVICES, and if that is undefined then check NVML. The fact that it still shows 2 devices makes me think PyTorch is somehow spawned as separate processes to Dask-CUDA workers -- if they were the same processes PyTorch would see the same CUDA_VISIBLE_DEVICES variable -- or that you're calling torch.cuda.device_count() from the main (client) process where CUDA_VISIBLE_DEVICES is undefined. Could you confirm what process you're querying that from?

I try torch.cuda.device_count() inside the function which is mapped to workers, and it also gives me 2. Also, the deserializer should be spawned by workers at least once, so I am sure that I am querying the worker process. Here is a snapshot of my testing code:

def deserialize_torch_Tensor(header, frames):

    print(f'num of gpus using: {torch.cuda.device_count()}, now is running on: {torch.cuda.current_device()}')
    ...

def main():
    # Create a LocalCUDACluster and a Client
    cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES='0,1')
    client = Client(cluster)

    # Define a function to increase each element in a tensor by one
    def increment(x):
        x = torch.as_tensor(x).cuda()
        print(f'graph is on: {x.get_device()}, num of gpus using: {torch.cuda.device_count()}, now is running on: {torch.cuda.current_device()}')
        return x + 1

    # Create two 1D tensors
    x = torch.tensor([1, 2, 3])
    y = torch.tensor([4, 5, 6])

    # Scatter the tensors to the workers
    x_future, y_future = client.scatter([x, y])

    # Map the increment function to the tensors
    futures = client.map(increment, [x_future, y_future])

    # Gather the results
    results = client.gather(futures)

    # Print the results
    print(results)

And the output:

(env)root:$ python test.py 
2023-05-15 09:01:13,331 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2023-05-15 09:01:13,331 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2023-05-15 09:01:13,399 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2023-05-15 09:01:13,399 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
num of gpus using: 2, now is running on: 0
num of gpus using: 2, now is running on: 0
graph is on: 0, num of gpus using: 2, now is running on: 0
graph is on: 0, num of gpus using: 2, now is running on: 0
num of gpus using: 8, now is running on: 0
num of gpus using: 8, now is running on: 0
[tensor([2, 3, 4], device='cuda:0'), tensor([5, 6, 7], device='cuda:0')]

But you give me a good hint, maybe I need to investigate PyTorch as well.

Sorry, this was my mistake. I forgot the way CUDA_VISIBLE_DEVICES is setup is not purely limiting to that one device, but they're cycled, so the active device is always the first device on CUDA_VISIBLE_DEVICES, consider the following:

import os
import cupy
import dask_cuda
from dask_cuda import LocalCUDACluster
from distributed import Client


if __name__ == "__main__":
    cluster = LocalCUDACluster(CUDA_VISIBLE_DEVICES="0,1,2")
    client = Client(cluster)

    def increment(x):
        CUDA_VISIBLE_DEVICES = os.environ["CUDA_VISIBLE_DEVICES"]
        print(f"[{os.getpid()}] {CUDA_VISIBLE_DEVICES=}, {x.device=}, {dask_cuda.utils.get_n_gpus()=}", flush=True)
        return x + 1

    x = cupy.arange(3)
    y = cupy.arange(3)
    z = cupy.arange(3)

    scattered = client.scatter([x, y, z])

    futures = client.map(increment, scattered)

    results = client.gather(futures)
    print(results)

The output is then:

[12552] CUDA_VISIBLE_DEVICES='2,0,1', x.device=<CUDA Device 0>, dask_cuda.utils.get_n_gpus()=3
[12549] CUDA_VISIBLE_DEVICES='1,2,0', x.device=<CUDA Device 0>, dask_cuda.utils.get_n_gpus()=3
[12545] CUDA_VISIBLE_DEVICES='0,1,2', x.device=<CUDA Device 0>, dask_cuda.utils.get_n_gpus()=3
[array([1, 2, 3]), array([1, 2, 3]), array([1, 2, 3])]

Notice how the first GPU in each worker is always the active one, but others are still listed so that in legacy systems CUDA IPC communications still finds the peers.

I hope this clarifies things.

Thank! This surely explains my question. This issue can be closed.