[Bug] Error when creating many proxy channels
liangyuRain opened this issue · 7 comments
Hi, I have encountered an error when a gpu creates many proxy channels and do mutual signal
plus wait
. I managed to create a minimal example shown below. In this example, I create nchannels
proxy channels between two gpus and do a signal
and then wait
. When nchannels
is 256, the program throws error; however, with smaller nchannels
like 128, the program executes fine. In my original case, I was creating 7x14 proxy channels from one gpu to the other seven. I also encountered error, so the problem seems to be the total number of proxy channels created on one gpu, regardless of the number of peers. I was running on a Lambda machine with 8xA100 GPUs connected by NVSwitch.
#include <mscclpp/proxy_channel_device.hpp>
extern "C" __global__ void __launch_bounds__(1024, 1)
proxy_bug(mscclpp::SimpleProxyChannelDeviceHandle* channels) {
const int tid = threadIdx.x;
const int bid = blockIdx.x;
if (tid == 0) {
channels[bid].signal();
channels[bid].wait();
}
}
import cupy as cp
from mscclpp import (
ProxyService,
Transport,
)
import mscclpp.comm as mscclpp_comm
from .mscclpp_mpi import MpiGroup
from mscclpp.utils import KernelBuilder
import struct
import os
def create_proxy_channels(proxy_service: ProxyService, group: mscclpp_comm.CommGroup,
nchannels: int, memory: cp.ndarray):
remote = 0 if group.my_rank == 1 else 1
connections = group.make_connection([remote], Transport.CudaIpc)
channels = []
for _ in range(nchannels):
channels.append(group.make_proxy_channels(proxy_service, memory, connections)[remote])
return channels
def main(group: mscclpp_comm.CommGroup, nchannels: int):
proxy_service = ProxyService()
channels = create_proxy_channels(proxy_service, group, nchannels, cp.zeros(8, dtype=cp.int32))
handles = [ch.device_handle().raw for ch in channels]
channel_mem = cp.asarray(memoryview(b"".join(handles)), dtype=cp.uint8)
params = b"" + struct.pack("P", channel_mem.data.ptr)
file_dir = os.path.dirname(os.path.abspath(__file__))
kernel = KernelBuilder(
file="proxy_bug_kernel.cu",
kernel_name="proxy_bug",
file_dir=file_dir,
).get_compiled_kernel()
nblocks = nchannels
nthreads = 1024
proxy_service.start_proxy()
group.barrier()
print(f"rank {group.my_rank} running kernel", flush=True)
kernel.launch_kernel(params, nblocks, nthreads, 0, None)
cp.cuda.runtime.deviceSynchronize()
print(f"rank {group.my_rank} done", flush=True)
group.barrier()
proxy_service.stop_proxy()
if __name__ == "__main__":
nchannels = 256
mpi_group = MpiGroup([0, 1])
group = mscclpp_comm.CommGroup(mpi_group.comm)
main(group, nchannels)
del group
I put both files under mscclpp/python/test
and run. The output is as follows:
(mscclpp) root@6c11b67f10f3:~/mscclpp-test/python# mpirun -np 2 python -m test.proxy_bug
set_mempolicy: Operation not permitted
rank 0 running kernel
rank 1 running kernel
set_mempolicy: Operation not permitted
/root/mscclpp-test/python/test/../../include/mscclpp/semaphore_device.hpp:30: void mscclpp::Host2DeviceSemaphoreDeviceHandle::wait(signed long): block: [226,0,0], thread: [0,0,0] Assertion `(atomicLoad(inboundSemaphoreId, memoryOrderAcquire) < (*expectedInboundSemaphoreId))` failed.
Traceback (most recent call last):
File "/root/miniconda3/envs/mscclpp/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/root/miniconda3/envs/mscclpp/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/root/mscclpp-test/python/test/proxy_bug.py", line 57, in <module>
main(group, nchannels)
File "/root/mscclpp-test/python/test/proxy_bug.py", line 45, in main
cp.cuda.runtime.deviceSynchronize()
File "cupy_backends/cuda/api/runtime.pyx", line 377, in cupy_backends.cuda.api.runtime.deviceSynchronize
File "cupy_backends/cuda/api/runtime.pyx", line 380, in cupy_backends.cuda.api.runtime.deviceSynchronize
File "cupy_backends/cuda/api/runtime.pyx", line 144, in cupy_backends.cuda.api.runtime.check_status
cupy_backends.cuda.api.runtime.CUDARuntimeError: cudaErrorAssert: device-side assert triggered
terminate called after throwing an instance of 'mscclpp::CudaError'
what(): Call to cudaMemcpyAsync(pimpl->tailReplica.get(), &pimpl->hostTail, sizeof(uint64_t), cudaMemcpyHostToDevice, pimpl->stream) failed. /root/mscclpp-test/src/fifo.cc:59 (Cuda failure: device-side assert triggered)
/root/mscclpp-test/python/test/../../include/mscclpp/semaphore_device.hpp:30: void mscclpp::Host2DeviceSemaphoreDeviceHandle::wait(signed long): block: [39,0,0], thread: [0,0,0] Assertion `(atomicLoad(inboundSemaphoreId, memoryOrderAcquire) < (*expectedInboundSemaphoreId))` failed.
Traceback (most recent call last):
File "/root/miniconda3/envs/mscclpp/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/root/miniconda3/envs/mscclpp/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/root/mscclpp-test/python/test/proxy_bug.py", line 57, in <module>
main(group, nchannels)
File "/root/mscclpp-test/python/test/proxy_bug.py", line 45, in main
cp.cuda.runtime.deviceSynchronize()
File "cupy_backends/cuda/api/runtime.pyx", line 377, in cupy_backends.cuda.api.runtime.deviceSynchronize
File "cupy_backends/cuda/api/runtime.pyx", line 380, in cupy_backends.cuda.api.runtime.deviceSynchronize
File "cupy_backends/cuda/api/runtime.pyx", line 144, in cupy_backends.cuda.api.runtime.check_status
cupy_backends.cuda.api.runtime.CUDARuntimeError: cudaErrorAssert: device-side assert triggered
terminate called after throwing an instance of 'mscclpp::CudaError'
what(): Call to cudaMemcpyAsync(pimpl->tailReplica.get(), &pimpl->hostTail, sizeof(uint64_t), cudaMemcpyHostToDevice, pimpl->stream) failed. /root/mscclpp-test/src/fifo.cc:59 (Cuda failure: device-side assert triggered)
===================================================================================
= BAD TERMINATION OF ONE OF YOUR APPLICATION PROCESSES
= PID 2232 RUNNING AT 6c11b67f10f3
= EXIT CODE: 134
= CLEANING UP REMAINING PROCESSES
= YOU CAN IGNORE THE BELOW CLEANUP MESSAGES
===================================================================================
YOUR APPLICATION TERMINATED WITH THE EXIT STRING: Aborted (signal 6)
This typically refers to a problem with your application.
Please see the FAQ page for debugging suggestions
For Nvidia GPU, if you use too many thread blocks, some blocks will not be launched until other thread blocks terminated. (refer: https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#hardware-implementation).
Though multi thread blocks can be launched in a single SM, but there are many restrictions, such as #threads, #registers.
For A100, if only has 108 SMs, if you request too many #thread_blocks, some thread blocks may not run and the wait
function cannot get the results from peers. The program crash as expected.
Maybe you can use different thread to call singal/wait
, not using different thread block.
I see. Thank you so much!
Hi, consider the following example. Only 2 threadblocks are launched, but it encounters the same error.
import cupy as cp
from mscclpp import (
ProxyService,
Transport,
)
import mscclpp.comm as mscclpp_comm
from .mscclpp_mpi import MpiGroup
from mscclpp.utils import KernelBuilder
import struct
import os
def create_proxy_channels(proxy_service: ProxyService, group: mscclpp_comm.CommGroup,
nchannels: int, memory: cp.ndarray):
remote = 0 if group.my_rank == 1 else 1
connections = group.make_connection([remote], Transport.CudaIpc)
channels = []
for _ in range(nchannels):
channels.append(group.make_proxy_channels(proxy_service, memory, connections)[remote])
return channels
def main(group: mscclpp_comm.CommGroup, nchannels: int, nchannels_per_block: int):
proxy_service = ProxyService()
channels = create_proxy_channels(proxy_service, group, nchannels, cp.zeros(8, dtype=cp.int32))
handles = [ch.device_handle().raw for ch in channels]
channel_mem = cp.asarray(memoryview(b"".join(handles)), dtype=cp.uint8)
params = b"" + struct.pack("P", channel_mem.data.ptr) + struct.pack("i", nchannels_per_block)
file_dir = os.path.dirname(os.path.abspath(__file__))
kernel = KernelBuilder(
file="proxy_bug_kernel.cu",
kernel_name="proxy_bug",
file_dir=file_dir,
).get_compiled_kernel()
assert nchannels % nchannels_per_block == 0
nblocks = nchannels // nchannels_per_block
print(f"rank {group.my_rank} nblocks {nblocks}", flush=True)
nthreads = 1024
proxy_service.start_proxy()
group.barrier()
print(f"rank {group.my_rank} running kernel", flush=True)
kernel.launch_kernel(params, nblocks, nthreads, 0, None)
cp.cuda.runtime.deviceSynchronize()
print(f"rank {group.my_rank} done", flush=True)
group.barrier()
proxy_service.stop_proxy()
if __name__ == "__main__":
nchannels = 256
nchannels_per_block = 128
mpi_group = MpiGroup([0, 1])
group = mscclpp_comm.CommGroup(mpi_group.comm)
main(group, nchannels, nchannels_per_block)
del group
#include <mscclpp/proxy_channel_device.hpp>
extern "C" __global__ void __launch_bounds__(1024, 1)
proxy_bug(mscclpp::SimpleProxyChannelDeviceHandle* channels, int nchannels_per_block) {
const int tid = threadIdx.x;
const int bid = blockIdx.x;
for (int i = tid; i < nchannels_per_block; i += blockDim.x)
channels[i + bid * nchannels_per_block].signal();
__syncthreads();
for (int i = tid; i < nchannels_per_block; i += blockDim.x)
channels[i + bid * nchannels_per_block].wait();
__syncthreads();
}
(mscclpp) root@6c11b67f10f3:~/mscclpp-test/python# mpirun -np 2 python -m test.proxy_bug
rank 1 nblocks 2
set_mempolicy: Operation not permitted
rank 0 nblocks 2
rank 0 running kernel
rank 1 running kernel
set_mempolicy: Operation not permitted
/root/mscclpp-test/python/test/../../include/mscclpp/semaphore_device.hpp:30: void mscclpp::Host2DeviceSemaphoreDeviceHandle::wait(signed long): block: [0,0,0], thread: [0,0,0] Assertion `(atomicLoad(inboundSemaphoreId, memoryOrderAcquire) < (*expectedInboundSemaphoreId))` failed.
/root/mscclpp-test/python/test/../../include/mscclpp/semaphore_device.hpp:30: void mscclpp::Host2DeviceSemaphoreDeviceHandle::wait(signed long): block: [0,0,0], thread: [1,0,0] Assertion `(atomicLoad(inboundSemaphoreId, memoryOrderAcquire) < (*expectedInboundSemaphoreId))` failed.
/root/mscclpp-test/python/test/../../include/mscclpp/semaphore_device.hpp:30: void mscclpp::Host2DeviceSemaphoreDeviceHandle::wait(signed long): block: [0,0,0], thread: [2,0,0] Assertion `(atomicLoad(inboundSemaphoreId, memoryOrderAcquire) < (*expectedInboundSemaphoreId))` failed.
...
Oh I forgot mscclpp proxy channel has some limitations: please check following comments:
mscclpp/include/mscclpp/fifo_device.hpp
Lines 28 to 29 in e7d3e2d
By default, the fifo size
is 128. Exceed this number may bring some unexpected behaviors.
You can change the code to use larger fifo size. Then this issue should be fixed.
mscclpp/include/mscclpp/fifo.hpp
Line 20 in e7d3e2d
We will update the ProxyService interface to let user customize the fifo size
Thank you for your response! I tried modifying the fifo size
to 256. The previous example with proxy channel signal
+wait
works fine. But if I do putWithSignal
+wait
, the same error persists. Interestingly, when I do put
+signal
+wait
, the example works fine again. If I set nchannels
to 128, then everything functions correctly in all cases. It seems that changing fifo size
is not enough when there is data write.
I summarize the three cases in the cuda code below. Additionally, in my original collective communication code, if I do put
+signal
instead of putWithSignal
, then the code finishes but the outcome is incorrect, so I suspect that put
+signal
is not working properly either.
#include <mscclpp/proxy_channel_device.hpp>
extern "C" __global__ void __launch_bounds__(1024, 1)
proxy_bug(mscclpp::SimpleProxyChannelDeviceHandle* channels, int nchannels_per_block) {
const int tid = threadIdx.x;
const int bid = blockIdx.x;
for (int i = tid; i < nchannels_per_block; i += blockDim.x) {
// case 1: error
channels[i + bid * nchannels_per_block].putWithSignal(0, 4);
// case 2: finished
// channels[i + bid * nchannels_per_block].put(0, 4);
// channels[i + bid * nchannels_per_block].signal();
// case 3: finished
// channels[i + bid * nchannels_per_block].signal();
}
__syncthreads();
for (int i = tid; i < nchannels_per_block; i += blockDim.x)
channels[i + bid * nchannels_per_block].wait();
__syncthreads();
}
import cupy as cp
from mscclpp import (
ProxyService,
Transport,
)
import mscclpp.comm as mscclpp_comm
from .mscclpp_mpi import MpiGroup
from mscclpp.utils import KernelBuilder
import struct
import os
def create_proxy_channels(proxy_service: ProxyService, group: mscclpp_comm.CommGroup,
nchannels: int, memory: cp.ndarray):
remote = 0 if group.my_rank == 1 else 1
connections = group.make_connection([remote], Transport.CudaIpc)
channels = []
for _ in range(nchannels):
channels.append(group.make_proxy_channels(proxy_service, memory, connections)[remote])
return channels
def main(group: mscclpp_comm.CommGroup, nchannels: int, nchannels_per_block: int):
proxy_service = ProxyService()
channels = create_proxy_channels(proxy_service, group, nchannels, cp.zeros(8, dtype=cp.int32))
handles = [ch.device_handle().raw for ch in channels]
channel_mem = cp.asarray(memoryview(b"".join(handles)), dtype=cp.uint8)
params = b"" + struct.pack("P", channel_mem.data.ptr) + struct.pack("i", nchannels_per_block)
file_dir = os.path.dirname(os.path.abspath(__file__))
kernel = KernelBuilder(
file="proxy_bug_kernel.cu",
kernel_name="proxy_bug",
file_dir=file_dir,
).get_compiled_kernel()
assert nchannels % nchannels_per_block == 0
nblocks = nchannels // nchannels_per_block
print(f"rank {group.my_rank} nblocks {nblocks}", flush=True)
nthreads = 1024
proxy_service.start_proxy()
group.barrier()
print(f"rank {group.my_rank} running kernel", flush=True)
kernel.launch_kernel(params, nblocks, nthreads, 0, None)
cp.cuda.runtime.deviceSynchronize()
print(f"rank {group.my_rank} done", flush=True)
group.barrier()
proxy_service.stop_proxy()
if __name__ == "__main__":
nchannels = 256
nchannels_per_block = 64
mpi_group = MpiGroup([0, 1])
group = mscclpp_comm.CommGroup(mpi_group.comm)
main(group, nchannels, nchannels_per_block)
del group
Okay... This problem due to we use 8 bit for memory handle. This means we can up to support 256 registered memories
.
In your case, for each channel need two registered memories
. So the total number is 512, exceed 256. (signal function does not use this field, so call signal will not cause any issue)
Since you actually use the same memory region for different channels. I add this function in comm.py
def make_n_proxy_channels(
self, proxy_service: ProxyService, tensor: cp.ndarray, connections: dict[int, Connection], n: int
) -> dict[int, SimpleProxyChannel]:
registered_memories = self.register_tensor_with_connections(tensor, connections)
memory_ids = {}
for rank in registered_memories:
memory_ids[rank] = proxy_service.add_memory(registered_memories[rank])
channels = {}
for _ in range(n):
semaphore_ids = {}
semaphores = self.make_semaphore(connections, Host2DeviceSemaphore)
for rank in semaphores:
semaphore_ids[rank] = proxy_service.add_semaphore(semaphores[rank])
for rank in semaphores:
if rank not in channels:
channels[rank] = []
channels[rank].append(SimpleProxyChannel(
proxy_service.proxy_channel(semaphore_ids[rank]), memory_ids[rank], memory_ids[self.my_rank]
))
return channels
And change create_proxy_channels
like this:
def create_proxy_channels(proxy_service: ProxyService, group: mscclpp_comm.CommGroup,
nchannels: int, memory: cp.ndarray):
remote = 0 if group.my_rank == 1 else 1
connections = group.make_connection([remote], Transport.CudaIpc)
channels = []
channels.append(group.make_n_proxy_channels(proxy_service, memory, connections, nchannels)[remote])
channels = list(itertools.chain(*channels))
return channels
Then the test passed,
Also the max channel number is 1024, please make sure not exceed this number. BTW, can you tell us why you need so many channels? Seems it's not a normal use case.
Thanks a lot for the solution! I am implementing a collective communication algorithm, which involves running a lot of independent broadcast spanning trees. Because these trees/flows need to be independent, they cannot share channels. For large-scale experiments, I may need a lot of proxy channels. @saeedmaleki can also provide information about the project.