microsoft/mscclpp

[Bug] Error when creating many proxy channels

liangyuRain opened this issue · 7 comments

Hi, I have encountered an error when a gpu creates many proxy channels and do mutual signal plus wait. I managed to create a minimal example shown below. In this example, I create nchannels proxy channels between two gpus and do a signal and then wait. When nchannels is 256, the program throws error; however, with smaller nchannels like 128, the program executes fine. In my original case, I was creating 7x14 proxy channels from one gpu to the other seven. I also encountered error, so the problem seems to be the total number of proxy channels created on one gpu, regardless of the number of peers. I was running on a Lambda machine with 8xA100 GPUs connected by NVSwitch.

#include <mscclpp/proxy_channel_device.hpp>

extern "C" __global__ void __launch_bounds__(1024, 1)
    proxy_bug(mscclpp::SimpleProxyChannelDeviceHandle* channels) {
  const int tid = threadIdx.x;
  const int bid = blockIdx.x;
  if (tid == 0) {
    channels[bid].signal();
    channels[bid].wait();
  }
}
import cupy as cp
from mscclpp import (
    ProxyService,
    Transport,
)
import mscclpp.comm as mscclpp_comm
from .mscclpp_mpi import MpiGroup
from mscclpp.utils import KernelBuilder
import struct
import os


def create_proxy_channels(proxy_service: ProxyService, group: mscclpp_comm.CommGroup, 
                          nchannels: int, memory: cp.ndarray):
    remote = 0 if group.my_rank == 1 else 1
    connections = group.make_connection([remote], Transport.CudaIpc)
    channels = []
    for _ in range(nchannels):
        channels.append(group.make_proxy_channels(proxy_service, memory, connections)[remote])
    return channels


def main(group: mscclpp_comm.CommGroup, nchannels: int):
    proxy_service = ProxyService()
    channels = create_proxy_channels(proxy_service, group, nchannels, cp.zeros(8, dtype=cp.int32))
    handles = [ch.device_handle().raw for ch in channels]
    channel_mem = cp.asarray(memoryview(b"".join(handles)), dtype=cp.uint8)

    params = b"" + struct.pack("P", channel_mem.data.ptr)

    file_dir = os.path.dirname(os.path.abspath(__file__))
    kernel = KernelBuilder(
        file="proxy_bug_kernel.cu",
        kernel_name="proxy_bug",
        file_dir=file_dir,
    ).get_compiled_kernel()

    nblocks = nchannels
    nthreads = 1024

    proxy_service.start_proxy()
    group.barrier()
    print(f"rank {group.my_rank} running kernel", flush=True)
    kernel.launch_kernel(params, nblocks, nthreads, 0, None)
    cp.cuda.runtime.deviceSynchronize()
    print(f"rank {group.my_rank} done", flush=True)
    group.barrier()
    proxy_service.stop_proxy()


if __name__ == "__main__":
    nchannels = 256

    mpi_group = MpiGroup([0, 1])
    group = mscclpp_comm.CommGroup(mpi_group.comm)

    main(group, nchannels)

    del group

I put both files under mscclpp/python/test and run. The output is as follows:

(mscclpp) root@6c11b67f10f3:~/mscclpp-test/python# mpirun -np 2 python -m test.proxy_bug
set_mempolicy: Operation not permitted
rank 0 running kernel
rank 1 running kernel
set_mempolicy: Operation not permitted
/root/mscclpp-test/python/test/../../include/mscclpp/semaphore_device.hpp:30: void mscclpp::Host2DeviceSemaphoreDeviceHandle::wait(signed long): block: [226,0,0], thread: [0,0,0] Assertion `(atomicLoad(inboundSemaphoreId, memoryOrderAcquire) < (*expectedInboundSemaphoreId))` failed.
Traceback (most recent call last):
  File "/root/miniconda3/envs/mscclpp/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/root/miniconda3/envs/mscclpp/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/root/mscclpp-test/python/test/proxy_bug.py", line 57, in <module>
    main(group, nchannels)
  File "/root/mscclpp-test/python/test/proxy_bug.py", line 45, in main
    cp.cuda.runtime.deviceSynchronize()
  File "cupy_backends/cuda/api/runtime.pyx", line 377, in cupy_backends.cuda.api.runtime.deviceSynchronize
  File "cupy_backends/cuda/api/runtime.pyx", line 380, in cupy_backends.cuda.api.runtime.deviceSynchronize
  File "cupy_backends/cuda/api/runtime.pyx", line 144, in cupy_backends.cuda.api.runtime.check_status
cupy_backends.cuda.api.runtime.CUDARuntimeError: cudaErrorAssert: device-side assert triggered
terminate called after throwing an instance of 'mscclpp::CudaError'
  what():  Call to cudaMemcpyAsync(pimpl->tailReplica.get(), &pimpl->hostTail, sizeof(uint64_t), cudaMemcpyHostToDevice, pimpl->stream) failed. /root/mscclpp-test/src/fifo.cc:59 (Cuda failure: device-side assert triggered)
/root/mscclpp-test/python/test/../../include/mscclpp/semaphore_device.hpp:30: void mscclpp::Host2DeviceSemaphoreDeviceHandle::wait(signed long): block: [39,0,0], thread: [0,0,0] Assertion `(atomicLoad(inboundSemaphoreId, memoryOrderAcquire) < (*expectedInboundSemaphoreId))` failed.
Traceback (most recent call last):
  File "/root/miniconda3/envs/mscclpp/lib/python3.8/runpy.py", line 194, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "/root/miniconda3/envs/mscclpp/lib/python3.8/runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "/root/mscclpp-test/python/test/proxy_bug.py", line 57, in <module>
    main(group, nchannels)
  File "/root/mscclpp-test/python/test/proxy_bug.py", line 45, in main
    cp.cuda.runtime.deviceSynchronize()
  File "cupy_backends/cuda/api/runtime.pyx", line 377, in cupy_backends.cuda.api.runtime.deviceSynchronize
  File "cupy_backends/cuda/api/runtime.pyx", line 380, in cupy_backends.cuda.api.runtime.deviceSynchronize
  File "cupy_backends/cuda/api/runtime.pyx", line 144, in cupy_backends.cuda.api.runtime.check_status
cupy_backends.cuda.api.runtime.CUDARuntimeError: cudaErrorAssert: device-side assert triggered
terminate called after throwing an instance of 'mscclpp::CudaError'
  what():  Call to cudaMemcpyAsync(pimpl->tailReplica.get(), &pimpl->hostTail, sizeof(uint64_t), cudaMemcpyHostToDevice, pimpl->stream) failed. /root/mscclpp-test/src/fifo.cc:59 (Cuda failure: device-side assert triggered)

===================================================================================
=   BAD TERMINATION OF ONE OF YOUR APPLICATION PROCESSES
=   PID 2232 RUNNING AT 6c11b67f10f3
=   EXIT CODE: 134
=   CLEANING UP REMAINING PROCESSES
=   YOU CAN IGNORE THE BELOW CLEANUP MESSAGES
===================================================================================
YOUR APPLICATION TERMINATED WITH THE EXIT STRING: Aborted (signal 6)
This typically refers to a problem with your application.
Please see the FAQ page for debugging suggestions

For Nvidia GPU, if you use too many thread blocks, some blocks will not be launched until other thread blocks terminated. (refer: https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#hardware-implementation).
Though multi thread blocks can be launched in a single SM, but there are many restrictions, such as #threads, #registers.
For A100, if only has 108 SMs, if you request too many #thread_blocks, some thread blocks may not run and the wait function cannot get the results from peers. The program crash as expected.

Maybe you can use different thread to call singal/wait, not using different thread block.

I see. Thank you so much!

Hi, consider the following example. Only 2 threadblocks are launched, but it encounters the same error.

import cupy as cp
from mscclpp import (
    ProxyService,
    Transport,
)
import mscclpp.comm as mscclpp_comm
from .mscclpp_mpi import MpiGroup
from mscclpp.utils import KernelBuilder
import struct
import os


def create_proxy_channels(proxy_service: ProxyService, group: mscclpp_comm.CommGroup, 
                          nchannels: int, memory: cp.ndarray):
    remote = 0 if group.my_rank == 1 else 1
    connections = group.make_connection([remote], Transport.CudaIpc)
    channels = []
    for _ in range(nchannels):
        channels.append(group.make_proxy_channels(proxy_service, memory, connections)[remote])
    return channels


def main(group: mscclpp_comm.CommGroup, nchannels: int, nchannels_per_block: int):
    proxy_service = ProxyService()
    channels = create_proxy_channels(proxy_service, group, nchannels, cp.zeros(8, dtype=cp.int32))
    handles = [ch.device_handle().raw for ch in channels]
    channel_mem = cp.asarray(memoryview(b"".join(handles)), dtype=cp.uint8)

    params = b"" + struct.pack("P", channel_mem.data.ptr) + struct.pack("i", nchannels_per_block)

    file_dir = os.path.dirname(os.path.abspath(__file__))
    kernel = KernelBuilder(
        file="proxy_bug_kernel.cu",
        kernel_name="proxy_bug",
        file_dir=file_dir,
    ).get_compiled_kernel()

    assert nchannels % nchannels_per_block == 0
    nblocks = nchannels // nchannels_per_block
    print(f"rank {group.my_rank} nblocks {nblocks}", flush=True)
    nthreads = 1024

    proxy_service.start_proxy()
    group.barrier()
    print(f"rank {group.my_rank} running kernel", flush=True)
    kernel.launch_kernel(params, nblocks, nthreads, 0, None)
    cp.cuda.runtime.deviceSynchronize()
    print(f"rank {group.my_rank} done", flush=True)
    group.barrier()
    proxy_service.stop_proxy()


if __name__ == "__main__":
    nchannels = 256
    nchannels_per_block = 128

    mpi_group = MpiGroup([0, 1])
    group = mscclpp_comm.CommGroup(mpi_group.comm)

    main(group, nchannels, nchannels_per_block)

    del group
#include <mscclpp/proxy_channel_device.hpp>

extern "C" __global__ void __launch_bounds__(1024, 1)
    proxy_bug(mscclpp::SimpleProxyChannelDeviceHandle* channels, int nchannels_per_block) {
  const int tid = threadIdx.x;
  const int bid = blockIdx.x;
  for (int i = tid; i < nchannels_per_block; i += blockDim.x) 
      channels[i + bid * nchannels_per_block].signal();
  __syncthreads();
  for (int i = tid; i < nchannels_per_block; i += blockDim.x) 
      channels[i + bid * nchannels_per_block].wait();
  __syncthreads();
}
(mscclpp) root@6c11b67f10f3:~/mscclpp-test/python# mpirun -np 2 python -m test.proxy_bug
rank 1 nblocks 2
set_mempolicy: Operation not permitted
rank 0 nblocks 2
rank 0 running kernel
rank 1 running kernel
set_mempolicy: Operation not permitted
/root/mscclpp-test/python/test/../../include/mscclpp/semaphore_device.hpp:30: void mscclpp::Host2DeviceSemaphoreDeviceHandle::wait(signed long): block: [0,0,0], thread: [0,0,0] Assertion `(atomicLoad(inboundSemaphoreId, memoryOrderAcquire) < (*expectedInboundSemaphoreId))` failed.
/root/mscclpp-test/python/test/../../include/mscclpp/semaphore_device.hpp:30: void mscclpp::Host2DeviceSemaphoreDeviceHandle::wait(signed long): block: [0,0,0], thread: [1,0,0] Assertion `(atomicLoad(inboundSemaphoreId, memoryOrderAcquire) < (*expectedInboundSemaphoreId))` failed.
/root/mscclpp-test/python/test/../../include/mscclpp/semaphore_device.hpp:30: void mscclpp::Host2DeviceSemaphoreDeviceHandle::wait(signed long): block: [0,0,0], thread: [2,0,0] Assertion `(atomicLoad(inboundSemaphoreId, memoryOrderAcquire) < (*expectedInboundSemaphoreId))` failed.
...

Oh I forgot mscclpp proxy channel has some limitations: please check following comments:

/// A concurrent FIFO where multiple device threads (the number of threads should not exceed the fifo size) can push
/// work elements and a single host proxy thread consumes them.

By default, the fifo size is 128. Exceed this number may bring some unexpected behaviors.
You can change the code to use larger fifo size. Then this issue should be fixed.

Fifo(int size = 128);

We will update the ProxyService interface to let user customize the fifo size

Thank you for your response! I tried modifying the fifo size to 256. The previous example with proxy channel signal+wait works fine. But if I do putWithSignal+wait, the same error persists. Interestingly, when I do put+signal+wait, the example works fine again. If I set nchannels to 128, then everything functions correctly in all cases. It seems that changing fifo size is not enough when there is data write.

I summarize the three cases in the cuda code below. Additionally, in my original collective communication code, if I do put+signal instead of putWithSignal, then the code finishes but the outcome is incorrect, so I suspect that put+signal is not working properly either.

#include <mscclpp/proxy_channel_device.hpp>

extern "C" __global__ void __launch_bounds__(1024, 1)
    proxy_bug(mscclpp::SimpleProxyChannelDeviceHandle* channels, int nchannels_per_block) {
  const int tid = threadIdx.x;
  const int bid = blockIdx.x;
  for (int i = tid; i < nchannels_per_block; i += blockDim.x) {
    // case 1: error
    channels[i + bid * nchannels_per_block].putWithSignal(0, 4);

    // case 2: finished
    // channels[i + bid * nchannels_per_block].put(0, 4);
    // channels[i + bid * nchannels_per_block].signal();

    // case 3: finished
    // channels[i + bid * nchannels_per_block].signal();
  }
  __syncthreads();
  for (int i = tid; i < nchannels_per_block; i += blockDim.x) 
      channels[i + bid * nchannels_per_block].wait();
  __syncthreads();
}
import cupy as cp
from mscclpp import (
    ProxyService,
    Transport,
)
import mscclpp.comm as mscclpp_comm
from .mscclpp_mpi import MpiGroup
from mscclpp.utils import KernelBuilder
import struct
import os


def create_proxy_channels(proxy_service: ProxyService, group: mscclpp_comm.CommGroup, 
                          nchannels: int, memory: cp.ndarray):
    remote = 0 if group.my_rank == 1 else 1
    connections = group.make_connection([remote], Transport.CudaIpc)
    channels = []
    for _ in range(nchannels):
        channels.append(group.make_proxy_channels(proxy_service, memory, connections)[remote])
    return channels


def main(group: mscclpp_comm.CommGroup, nchannels: int, nchannels_per_block: int):
    proxy_service = ProxyService()
    channels = create_proxy_channels(proxy_service, group, nchannels, cp.zeros(8, dtype=cp.int32))
    handles = [ch.device_handle().raw for ch in channels]
    channel_mem = cp.asarray(memoryview(b"".join(handles)), dtype=cp.uint8)

    params = b"" + struct.pack("P", channel_mem.data.ptr) + struct.pack("i", nchannels_per_block)

    file_dir = os.path.dirname(os.path.abspath(__file__))
    kernel = KernelBuilder(
        file="proxy_bug_kernel.cu",
        kernel_name="proxy_bug",
        file_dir=file_dir,
    ).get_compiled_kernel()

    assert nchannels % nchannels_per_block == 0
    nblocks = nchannels // nchannels_per_block
    print(f"rank {group.my_rank} nblocks {nblocks}", flush=True)
    nthreads = 1024

    proxy_service.start_proxy()
    group.barrier()
    print(f"rank {group.my_rank} running kernel", flush=True)
    kernel.launch_kernel(params, nblocks, nthreads, 0, None)
    cp.cuda.runtime.deviceSynchronize()
    print(f"rank {group.my_rank} done", flush=True)
    group.barrier()
    proxy_service.stop_proxy()


if __name__ == "__main__":
    nchannels = 256
    nchannels_per_block = 64

    mpi_group = MpiGroup([0, 1])
    group = mscclpp_comm.CommGroup(mpi_group.comm)

    main(group, nchannels, nchannels_per_block)

    del group

Okay... This problem due to we use 8 bit for memory handle. This means we can up to support 256 registered memories.

#define MSCCLPP_BITS_REGMEM_HANDLE 8

In your case, for each channel need two registered memories. So the total number is 512, exceed 256. (signal function does not use this field, so call signal will not cause any issue)

Since you actually use the same memory region for different channels. I add this function in comm.py

    def make_n_proxy_channels(
        self, proxy_service: ProxyService, tensor: cp.ndarray, connections: dict[int, Connection], n: int
    ) -> dict[int, SimpleProxyChannel]:
        registered_memories = self.register_tensor_with_connections(tensor, connections)
        memory_ids = {}
        for rank in registered_memories:
            memory_ids[rank] = proxy_service.add_memory(registered_memories[rank])
        channels = {}
        for _ in range(n):
            semaphore_ids = {}
            semaphores = self.make_semaphore(connections, Host2DeviceSemaphore)
            for rank in semaphores:
                semaphore_ids[rank] = proxy_service.add_semaphore(semaphores[rank])
            for rank in semaphores:
                if rank not in channels:
                    channels[rank] = []
                channels[rank].append(SimpleProxyChannel(
                    proxy_service.proxy_channel(semaphore_ids[rank]), memory_ids[rank], memory_ids[self.my_rank]
                ))
        return channels

And change create_proxy_channels like this:

def create_proxy_channels(proxy_service: ProxyService, group: mscclpp_comm.CommGroup,
                          nchannels: int, memory: cp.ndarray):
    remote = 0 if group.my_rank == 1 else 1
    connections = group.make_connection([remote], Transport.CudaIpc)
    channels = []
    channels.append(group.make_n_proxy_channels(proxy_service, memory, connections, nchannels)[remote])
    channels = list(itertools.chain(*channels))
    return channels

Then the test passed,
Also the max channel number is 1024, please make sure not exceed this number. BTW, can you tell us why you need so many channels? Seems it's not a normal use case.

Thanks a lot for the solution! I am implementing a collective communication algorithm, which involves running a lot of independent broadcast spanning trees. Because these trees/flows need to be independent, they cannot share channels. For large-scale experiments, I may need a lot of proxy channels. @saeedmaleki can also provide information about the project.