How to use DDP in pytorch operator?
SeibertronSS opened this issue · 3 comments
I have a pytorch DDP program as follows:
import argparse
from tqdm import tqdm
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
class ToyModel(nn.Module):
def __init__(self):
super(ToyModel, self).__init__()
self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)
def forward(self, x):
x = self.pool(F.relu(self.conv1(x)))
x = self.pool(F.relu(self.conv2(x)))
x = x.view(-1, 16 * 5 * 5)
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return x
def get_dataset():
transform = torchvision.transforms.Compose([
torchvision.transforms.ToTensor(),
torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])
my_trainset = torchvision.datasets.CIFAR10(root='./data', train=True,
download=True, transform=transform)
train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset)
trainloader = torch.utils.data.DataLoader(my_trainset,
batch_size=16, num_workers=2, sampler=train_sampler)
return trainloader
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=-1, type=int)
FLAGS = parser.parse_args()
local_rank = FLAGS.local_rank
torch.cuda.set_device(local_rank)
dist.init_process_group(backend='nccl')
trainloader = get_dataset()
model = ToyModel().to(local_rank)
ckpt_path = None
if dist.get_rank() == 0 and ckpt_path is not None:
model.load_state_dict(torch.load(ckpt_path))
model = DDP(model, device_ids=[local_rank], output_device=local_rank)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001)
loss_func = nn.CrossEntropyLoss().to(local_rank)
model.train()
iterator = tqdm(range(100))
for epoch in iterator:
trainloader.sampler.set_epoch(epoch)
for data, label in trainloader:
data, label = data.to(local_rank), label.to(local_rank)
optimizer.zero_grad()
prediction = model(data)
loss = loss_func(prediction, label)
loss.backward()
iterator.desc = "loss = %0.3f" % loss
optimizer.step()
if dist.get_rank() == 0:
torch.save(model.module.state_dict(), "%d.ckpt" % epoch)
When I run in the host, I will execute python -m torch.distributed.launch --nnodes=2 --node_rank=0 --nproc_per_node 8 \ --master_adderss $my_address --master_port $my_port main.py
both in master and worker. If Pytorch operator is used, is there no need to specify master_adderss
and master_port
?
Not required. Variables will be auto populated
pytorch-operator/pkg/controller.v1/pytorch/pod.go
Lines 259 to 276 in a502590
Thank you very much!
I have a pytorch DDP program as follows:
import argparse from tqdm import tqdm import torch import torchvision import torch.nn as nn import torch.nn.functional as F import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP class ToyModel(nn.Module): def __init__(self): super(ToyModel, self).__init__() self.conv1 = nn.Conv2d(3, 6, 5) self.pool = nn.MaxPool2d(2, 2) self.conv2 = nn.Conv2d(6, 16, 5) self.fc1 = nn.Linear(16 * 5 * 5, 120) self.fc2 = nn.Linear(120, 84) self.fc3 = nn.Linear(84, 10) def forward(self, x): x = self.pool(F.relu(self.conv1(x))) x = self.pool(F.relu(self.conv2(x))) x = x.view(-1, 16 * 5 * 5) x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) x = self.fc3(x) return x def get_dataset(): transform = torchvision.transforms.Compose([ torchvision.transforms.ToTensor(), torchvision.transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)) ]) my_trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform) train_sampler = torch.utils.data.distributed.DistributedSampler(my_trainset) trainloader = torch.utils.data.DataLoader(my_trainset, batch_size=16, num_workers=2, sampler=train_sampler) return trainloader parser = argparse.ArgumentParser() parser.add_argument("--local_rank", default=-1, type=int) FLAGS = parser.parse_args() local_rank = FLAGS.local_rank torch.cuda.set_device(local_rank) dist.init_process_group(backend='nccl') trainloader = get_dataset() model = ToyModel().to(local_rank) ckpt_path = None if dist.get_rank() == 0 and ckpt_path is not None: model.load_state_dict(torch.load(ckpt_path)) model = DDP(model, device_ids=[local_rank], output_device=local_rank) optimizer = torch.optim.SGD(model.parameters(), lr=0.001) loss_func = nn.CrossEntropyLoss().to(local_rank) model.train() iterator = tqdm(range(100)) for epoch in iterator: trainloader.sampler.set_epoch(epoch) for data, label in trainloader: data, label = data.to(local_rank), label.to(local_rank) optimizer.zero_grad() prediction = model(data) loss = loss_func(prediction, label) loss.backward() iterator.desc = "loss = %0.3f" % loss optimizer.step() if dist.get_rank() == 0: torch.save(model.module.state_dict(), "%d.ckpt" % epoch)
When I run in the host, I will execute
python -m torch.distributed.launch --nnodes=2 --node_rank=0 --nproc_per_node 8 \ --master_adderss $my_address --master_port $my_port main.py
both in master and worker. If Pytorch operator is used, is there no need to specifymaster_adderss
andmaster_port
?
I want to know if I want to use two nodes how to change the code?