本文对 PyTorch 中的 DistributedDataParallel(DDP)及混合精度模块的使用方式进行讲解。
关于 DDP 的原理及其相较于 DataParallel(DP)的优势,前人之述备矣,本文不再花费大量篇幅。
更新:添加 torchrun 启动方式示例
本文参考了大量知乎文章、PyTorch文档及ChatGPT的回答,最主要参考的是这篇文章。在这些基础上,结合个人项目中使用的情况,追求给出一个贴近 DL 项目现实且简洁、高效、可拓展的示例。
本文的代码针对单机多卡的情况,使用 nccl 后端,并通过 env 进行初始化。全部代码在这里。带有注释的行将着重讲解。
首先,给出不使用 DDP 和 混合精度加速的代码。完整程序在这里。
看看程序的入口:执行main
函数并计时。
if __name__ == '__main__':
args = prepare() ###
time_start = time.time()
main(args)
time_elapsed = time.time() - time_start
print(f'\ntime elapsed: {time_elapsed:.2f} seconds')
第二行的prepare
函数用于获取命令行参数:
def prepare():
parser = argparse.ArgumentParser()
parser.add_argument('--gpu', default='0')
parser.add_argument('-e',
'--epochs',
default=3,
type=int,
metavar='N',
help='number of total epochs to run')
parser.add_argument('-b',
'--batch_size',
default=32,
type=int,
metavar='N',
help='number of batchsize')
args = parser.parse_args()
return args
在main
函数中,首先通过parse_args
获得一些训练相关的命令行参数,然后设定模型、损失函数、优化器、数据集。接着依次进行训练、测试,并保存模型的state_dict
。
def main(args):
model = ConvNet().cuda() ###
criterion = nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
train_dataset = torchvision.datasets.MNIST(root='./data',
train=True,
transform=transforms.ToTensor(),
download=True)
train_dloader = torch.utils.data.DataLoader(
dataset=train_dataset,
batch_size=args.batch_size,
shuffle=True,
num_workers=0,
pin_memory=True,
)
test_dataset = torchvision.datasets.MNIST(root='./data',
train=False,
transform=transforms.ToTensor(),
download=True)
test_dloader = torch.utils.data.DataLoader(
dataset=test_dataset,
batch_size=args.batch_size,
shuffle=True,
num_workers=2,
pin_memory=True,
)
for epoch in range(args.epochs):
print(f'begin training of epoch {epoch + 1}/{args.epochs}')
train(model, train_dloader, criterion, optimizer) ###
print(f'begin testing')
test(model, test_dloader) ###
torch.save({'model': model.state_dict()}, 'origin_checkpoint.pt')
上面第一行使用的模型为一个简单的 CNN:
import torch.nn as nn
class ConvNet(nn.Module):
def __init__(self, num_classes=10):
super(ConvNet, self).__init__()
self.layer1 = nn.Sequential(
nn.Conv2d(1, 16, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(16), nn.ReLU(), nn.MaxPool2d(kernel_size=2,
stride=2))
self.layer2 = nn.Sequential(
nn.Conv2d(16, 32, kernel_size=5, stride=1, padding=2),
nn.BatchNorm2d(32), nn.ReLU(), nn.MaxPool2d(kernel_size=2,
stride=2))
self.fc = nn.Linear(7 * 7 * 32, num_classes)
def forward(self, x):
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out
训练使用的train
函数:
def train(model, train_dloader, criterion, optimizer):
model.train()
for images, labels in train_dloader:
images = images.cuda()
labels = labels.cuda()
outputs = model(images)
loss = criterion(outputs, labels)
optimizer.zero_grad()
loss.backward()
optimizer.step()
测试使用的test
函数:
def test(model, test_dloader):
model.eval()
size = torch.tensor(0.).cuda()
correct = torch.tensor(0.).cuda()
for images, labels in test_dloader:
images = images.cuda()
labels = labels.cuda()
with torch.no_grad():
outputs = model(images)
size += images.size(0)
correct += (outputs.argmax(1) == labels).type(torch.float).sum()
acc = correct / size
print(f'Accuracy is {acc:.2%}')
最后,启动命令如下:
python origin_main.py --gpu 0
输出的结果:
begin training of epoch 1/3
begin training of epoch 2/3
begin training of epoch 3/3
begin testing
Accuracy is 91.55%
time elapsed: 22.72 seconds
在介绍完原型后,以下对代码进行改造,以使用DDP。完整程序在这里。
首先,我们在if __name__ == '__main__'
中启动 DDP:
if __name__ == '__main__':
args = prepare() ###
time_start = time.time()
mp.spawn(main, args=(args, ), nprocs=torch.cuda.device_count()) #import torch.multiprocessing as mp
time_elapsed = time.time() - time_start
print(f'\ntime elapsed: {time_elapsed:.2f} seconds')
spawn
函数的主要参数包括以下几个:
fn
,即上面传入的main
函数。每个线程将执行一次该函数args
,即fn
所需的参数。传给fn
的参数必须写成元组的形式,哪怕像上面一样只有一个nprocs
启动的进程数,将其设置为world_size
即可。不传默认为1,与world_size
不一致会导致进程等待同步而一直停滞。
在prepare
函数里面,也进行了一些 DDP 的配置:
def prepare():
parser = argparse.ArgumentParser()
parser.add_argument('--gpu', default='0,1')
parser.add_argument('-e',
'--epochs',
default=3,
type=int,
metavar='N',
help='number of total epochs to run')
parser.add_argument('-b',
'--batch_size',
default=32,
type=int,
metavar='N',
help='number of batchsize')
args = parser.parse_args()
# 下面几行是新加的,用于启动多进程 DDP。使用 torchrun 启动时只需要设置使用的 GPU
os.environ['MASTER_ADDR'] = 'localhost' # 0号机器的 IP
os.environ['MASTER_PORT'] = '19198' # 0号机器的可用端口,随便选一个没被占用的
os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu # 使用哪些 GPU
world_size = torch.cuda.device_count() # 就是上一行使用的 GPU 数量
os.environ['WORLD_SIZE'] = str(world_size)
return args
再来看看main
函数里面添加了什么。首先是其添加一个额外的参数local_rank
(在mp.spawn
里面不用传,会自动分配)
def main(local_rank, args):
init_ddp(local_rank) ### 进程初始化
model = ConvNet().cuda() ### 模型的 forward 方法变了
model = nn.SyncBatchNorm.convert_sync_batchnorm(model) ### 转换模型的 BN 层
model = nn.parallel.DistributedDataParallel(model, device_ids=[local_rank]) ### 套 DDP
criterion = nn.CrossEntropyLoss().cuda()
optimizer = torch.optim.SGD(model.parameters(), 1e-4)
scaler = GradScaler() ### 用于混合精度训练
train_dataset = torchvision.datasets.MNIST(root='./data',
train=True,
transform=transforms.ToTensor(),
download=True)
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset) ### 用于在 DDP 环境下采样
g = get_ddp_generator() ###
train_dloader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=args.batch_size,
shuffle=False,
num_workers=4,
pin_memory=True,
sampler=train_sampler,
generator=g) ### 添加额外的 generator
test_dataset = torchvision.datasets.MNIST(root='./data',
train=False,
transform=transforms.ToTensor(),
download=True)
test_sampler = torch.utils.data.distributed.DistributedSampler(
test_dataset) ### 用于在 DDP 环境下采样
test_dloader = torch.utils.data.DataLoader(dataset=test_dataset,
batch_size=args.batch_size,
shuffle=False,
num_workers=2,
pin_memory=True,
sampler=test_sampler)
for epoch in range(args.epochs):
if local_rank == 0: ### 防止每个进程都输出一次
print(f'begin training of epoch {epoch + 1}/{args.epochs}')
train_dloader.sampler.set_epoch(epoch) ### 防止采样出 bug
train(model, train_dloader, criterion, optimizer, scaler)
if local_rank == 0:
print(f'begin testing')
test(model, test_dloader)
if local_rank == 0: ### 防止每个进程都保存一次
torch.save({'model': model.state_dict(), 'scaler': scaler.state_dict()}, 'ddp_checkpoint.pt')
dist.destroy_process_group() ### 最后摧毁进程,和 init_process_group 相对
首先,根据用init_ddp
函数对模型进行初始化。这里我们使用 nccl 后端,并用 env 作为初始化方法:
def init_ddp(local_rank):
# 有了这一句之后,在转换device的时候直接使用 a=a.cuda()即可,否则要用a=a.cuda(local_rank)
torch.cuda.set_device(local_rank)
os.environ['RANK'] = str(local_rank)
dist.init_process_group(backend='nccl', init_method='env://')
在完成了该初始化后,可以很轻松地在需要时获得local_rank
、world_size
,而不需要作为额外参数从main
中一层一层往下传。
import torch.distributed as dist
local_rank = dist.get_rank()
world_size = dist.get_world_size()
比如需要print
, log
, save_state_dict
时,由于多个进程拥有相同的副本,故只需要一个进程执行即可,比如:
if local_rank == 0:
print(f'begin testing')
if local_rank == 0: ### 防止每个进程都保存一次
torch.save({'model': model.state_dict(), 'scaler': scaler.state_dict()}, 'ddp_checkpoint.pt')
为了加速推理,我们在模型的forward
方法里套一个torch.cuda.amp.autocast()
:
使得forward
函数变为:
def forward(self, x):
with torch.cuda.amp.autocast(): # 混合精度,加速推理
out = self.layer1(x)
out = self.layer2(out)
out = out.reshape(out.size(0), -1)
out = self.fc(out)
return out
autocast 也可以在推理的时候再套,但是在这里套最方便,而且适用于所有情况。
在模型改变之后,使用convert_sync_batchnorm
和DistributedDataParallel
对模型进行包装。
创建 scaler,用于训练时对 loss 进行 scale:
from torch.cuda.amp import GradScaler
scaler = GradScaler() ### 用于混合精度训练
训练时,需要使用 DDP 的sampler,并且在num_workers > 1
时需要传入generator
,否则对于同一个worker,所有进程的augmentation相同,减弱训练的随机性。详细分析参见这篇文章。
def get_ddp_generator(seed=3407):
local_rank = dist.get_rank()
g = torch.Generator()
g.manual_seed(seed + local_rank)
return g
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset) ### 用于在 DDP 环境下采样
g = get_ddp_generator() ###
train_dloader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=args.batch_size,
shuffle=False, ### shuffle 通过 sampler 完成
num_workers=4,
pin_memory=True,
sampler=train_sampler,
generator=g) ### 添加额外的 generator
并且在多个epoch
的训练时,需要设置train_dloader.sampler.set_epoch(epoch)
。
下面来看看train
函数。
def train(model, train_dloader, criterion, optimizer, scaler):
model.train()
for images, labels in train_dloader:
images = images.cuda()
labels = labels.cuda()
outputs = model(images)
loss = criterion(outputs, labels)
optimizer.zero_grad()
scaler.scale(loss).backward() ###
scaler.step(optimizer) ###
scaler.update() ###
最后三行发生了改变。相较于原始的loss.backward
、optimizer.step()
,这里通过scaler
对梯度进行缩放,防止由于使用混合精度导致损失下溢,并且对scaler
自身的状态进行更新呢。如果有多个loss
,它们也使用同一个scaler
。如果需要保存模型的state_dict
并且在后续继续训练(比如预训练-微调模式),最好连带scaler
的状态一起保留,并在后续的微调过程中和模型的参数异同加载。
测试时,需要将多个进程的数据reduce
到一张卡上。注意,在test
函数的外面加上if local_rank == 0
,否则多个进程会彼此等待而陷入死锁。
def test(model, test_dloader):
local_rank = dist.get_rank()
model.eval()
size = torch.tensor(0.).cuda()
correct = torch.tensor(0.).cuda()
for images, labels in test_dloader:
images = images.cuda()
labels = labels.cuda()
with torch.no_grad():
outputs = model(images)
size += images.size(0)
correct += (outputs.argmax(1) == labels).type(torch.float).sum()
dist.reduce(size, 0, op=dist.ReduceOp.SUM) ###
dist.reduce(correct, 0, op=dist.ReduceOp.SUM) ###
if local_rank == 0:
acc = correct / size
print(f'Accuracy is {acc:.2%}')
注释的两行即为所需添加的reduce
操作。
至此,添加的代码讲解完毕。
启动的方式变化不大:
python ddp_main.py --gpu 0,1
相应的结果:
begin training of epoch 1/3
begin training of epoch 2/3
begin training of epoch 3/3
begin testing
Accuracy is 89.21%
time elapsed: 30.82 seconds
上述是通过mp.spawn
启动。mp
模块对multiprocessing
库进行封装,并没有特定针对DDP
。我们还可以通过官方推荐的torchrun
进行启动。完整的程序在这里。
相比mp.spawn
启动,torchrun
自动控制一些环境变量的设置,因而更为方便。我们只需要设置os.environ['CUDA_VISIBLE_DEVICES']
即可(不设置默认为该机器上的所有GPU),而无需设置os.environ['MASTER_ADDR']
等。此外,main
函数不再需要local_rank
参数。程序入口变为:
if __name__ == '__main__':
args = prepare()
time_start = time.time()
main(args)
time_elapsed = time.time() - time_start
local_rank = int(os.environ['LOCAL_RANK'])
if local_rank == 0:
print(f'\ntime elapsed: {time_elapsed:.2f} seconds')
运行脚本的命令由python
变为了torchrun
,如下:
torchrun --standalone --nproc_per_node=2 ddp_main_torchrun.py --gpu 0,1
其中,nproc_per_node
表示进程数,将其设置为使用的GPU数量即可。
在写完 DDP 的代码之后,最好检查一遍,否则很容易因为漏了什么而出现莫名奇妙的错误,比如程序卡着不动了,也不报错)
大致需要检查:
- DDP 初始化有没有完成,包括
if __name__ == '__main__'
里和main
函数里的。退出main
函数时摧毁进程。 - 模型的封装,包括autocast,BN 层的转化和 DDP 封装
- 指定
train_dloader
的sampler
、generator
和shuffle
,并且在每个epoch
设置sampler
,测试集、验证集同理。 - 训练时使用
scaler
对loss
进行scale
- 对于
print
、log
、save
等操作,仅在一个线程上进行。 - 测试时进行
reduce
多个线程大致相当于增大了相应倍数的batch_size
,最好相应地调一调batch_size
和学习率。本文没有进行调节,导致测试获得的准确率有一些差别。
模型较小时速度差别不大,反而DDP与混合精度可能因为一些初始化和精度转换耗费额外时间而更慢。在模型较大时,DDP + 混合精度的速度要明显高于常规,且能降低显存占用。