distributed training nanoGPT on sagemaker
distribuetd training nanoGPT on sagemaker


  • Baby nanoGPT model
  • Distributed training on SageMaker
  • Load model and test


class Head(nn.Module):

  def __init__(self, head_size) -> None:
    self.key = nn.Linear(n_embed, head_size, bias=False)
    self.query = nn.Linear(n_embed, head_size, bias=False)
    self.value = nn.Linear(n_embed,  head_size, bias=False)
    self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    B,T,C = x.shape
    k = self.key(x)   # (B, T, C)
    q = self.query(x) # (B, T, C)
    # compute attention scores ("affinities")
    wei = q @ k.transpose(-2,-1) * C**-0.05 # (B, T, C) @ (B, C, T) -> (B, T, T)
    wei = wei.masked_fill(self.tril[:T,:T] == 0, float('-inf')) # (B, T, T)
    wei = F.softmax(wei, dim=-1) # (B, T, T)
    wei = self.dropout(wei)
    # perform the weighted aggregation of the values
    v = self.value(x) # (B, T, C)
    out = wei @ v # (B, T, T) @ (B, T, C) -> (B, T, C)
    return out

class FeedForward(nn.Module):

  def __init__(self, n_embed) -> None:
    self.net = nn.Sequential(
      nn.Linear(n_embed, 4 * n_embed),
      nn.Linear(4 * n_embed, n_embed),

  def forward(self, x):
    return self.net(x)

class MultiHeadAttention(nn.Module):

  def __init__(self, num_heads, head_size) -> None:
    self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
    self.proj = nn.Linear(n_embed, n_embed)
    self.dropout = nn.Dropout(dropout)

  def forward(self, x):
    out = torch.cat([h(x) for h in self.heads], dim=-1)
    out = self.dropout(self.proj(out))
    return out

class Block(nn.Module):

  def __init__(self, n_embed, n_head) -> None:
    head_size = n_embed // n_head
    self.sa = MultiHeadAttention(n_head, head_size)
    self.ffwd = FeedForward(n_embed)
    self.ln1 = nn.LayerNorm(n_embed)
    self.ln2 = nn.LayerNorm(n_embed)

  def forward(self, x):
    x = x + self.sa(self.ln1(x))
    x = x + self.ffwd(self.ln2(x))
    return x

class BigramLanguageMmodel(nn.Module):

  def __init__(self, vocab_size) -> None:
    self.token_embedding_table = nn.Embedding(vocab_size, n_embed)
    self.position_embedding_talbe = nn.Embedding(block_size, n_embed)
    self.blocks = nn.Sequential(*[Block(n_embed, n_head=n_head) for _ in range(n_layer)])
    self.ln_f = nn.LayerNorm(n_embed) # final layer norm
    self.lm_head = nn.Linear(n_embed, vocab_size)

  def forward(self, idx, targets=None):
    B, T = idx.shape

    # idx and targets are both (B, T) tensor of integers
    tok_emb = self.token_embedding_table(idx) # (B, T, C)
    pos_emb = self.position_embedding_talbe(torch.arange(T, device=device))
    x = tok_emb + pos_emb # (B, T, C)
    x = self.blocks(x) # (B, T, C)
    x = self.ln_f(x) # (B, T, C)
    logits = self.lm_head(x) # (B, T, vocab_size)

    if targets is None:
      loss = None
      B, T, C = logits.shape
      logits = logits.view(B*T, C)
      targets = targets.view(B*T)
      # softmax and loss
      loss = F.cross_entropy(logits, targets)

    return logits, loss

  def generate(self, idx, max_new_tokens):
    # idx is (B, T) array of indicies in the current context
    for _ in range(max_new_tokens):
      # crop ind to the last block_size tokens
      idx_cond = idx[:, -block_size:]
      # get the predictions
      logits, loss = self(idx_cond)
      # focus only on the last time step
      logits = logits[:, -1, :] # become (B, C)
      # apply softmax to get probablities
      probs = F.softmax(logits, dim=-1) # (B, C)
      # sample from the distribution
      idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
      # append sampled index to the running sequence
      idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
    return idx

Train on Notebook

Let setup some parameters

batch_size = 64 # how many independent sequences will we process in parallel
block_size = 256  # what is the maximum context length for predictions?
max_iters = 5000
eval_interval = 500
learning_rate = 3e-4
device = 'cuda' if torch.cuda.is_available() else 'cpu'
eval_iters = 200

n_embed = 384
n_head = 6
n_layer = 6
dropout = 0.2


Read data

with open("input.txt", 'r', encoding='utf-8') as f:
  text = f.read()

chars = sorted(list(set(text)))
vocab_size =len(chars)

stoi = { ch:i for i,ch in enumerate(chars)}
itos = { i:ch for i,ch in enumerate(chars)}

Create a batch

def get_batch(split):
  # generate a small batch of data of input x and targets y
  data = train_data if split == 'train' else val_data
  ix = torch.randint(len(data) - block_size, (batch_size,))
  x = torch.stack([ data[i:i+block_size] for i in ix])
  y = torch.stack([ data[i+1: i+block_size+1] for i in ix])
  x,y = x.to(device), y.to(device)
  return x, y

Estimate loss

def estimate_loss():
  out = {}
  for split in ['train', 'val']:
    losses = torch.zeros(eval_iters)
    for k in range(eval_iters):
      X, Y = get_batch(split)
      logits, loss = model(X, Y)
      losses[k] = loss.item()
    out[split] = losses.mean()
  return out

Train model

model = BigramLanguageMmodel(vocab_size)
model = model.to(device)

# batch_size = 32
for iter in range(max_iters):

  # every once in a while evaluate the loss on the train and val sets
  if iter % eval_interval == 0:
    losses = estimate_loss()
    print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

  # sample a batch of data
  xb, yb = get_batch("train")

  # evaluate the loss
  logits, loss = model(xb, yb)
  # optimizer step

Distributed Training

  • Upload training data to s3
  • Prepare script for distributed training

First let download training data

!wget https://raw.g ithubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

Then upload to the default sagemaker s3 bucket

aws s3 cp input.txt s3://$SAGEMAKER_BUCKET/train/input.txt

Now let modify the train.py with model and enable distributed training on SageMaker.

How to save model

torch.save(model.cpu().state_dict(), "/opt/ml/model/nanoGPT.pth")

We have to convert the tensor of gradients from multiple gpus to a scalar before calling backward


Here is the full train.py script

def get_batch(split, train_data, val_data):
    # generate a small batch of data of input x and targets y
    data = train_data if split == "train" else val_data
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i : i + block_size] for i in ix])
    y = torch.stack([data[i + 1 : i + block_size + 1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

def estimate_loss(model, train_data, val_data):
    out = {}
    for split in ["train", "val"]:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split, train_data, val_data)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    return out

# train
def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("--backend", type=str, default="gloo")
        "--hosts", type=list, default=json.loads(os.environ["SM_HOSTS"])
        "--current-host", type=str, default=os.environ["SM_CURRENT_HOST"]
    return parser.parse_args()

def get_data():
    # prepare training data
    with open("/opt/ml/input/data/training/input.txt", "r", encoding="utf-8") as f:
        text = f.read()
    # char set
    chars = sorted(list(set(text)))
    vocab_size = len(chars)
    # mapping
    stoi = {ch: i for i, ch in enumerate(chars)}
    itos = {i: ch for i, ch in enumerate(chars)}
    # encode and decode
    encode = lambda s: [stoi[c] for c in s]
    decode = lambda l: "".join([itos[i] for i in l])
    data = torch.tensor(encode(text))
    # split train and val_data
    n = int(0.9 * len(data))
    train_data = data[:n]
    val_data = data[:n]
    return train_data, val_data, vocab_size, decode

def load_model():
    model = torch.load("./model/nanoGPT.pt")

def train():
    args = parse_args()
    # get data from s3
    train_data, val_data, vocab_size, decode = get_data()
    # init process group
    world_size = len(args.hosts)
    host_rank = args.hosts.index(args.current_host)
    print(f"host rank is {host_rank}")
    dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size)
    # device
    device = "cuda"
    # model
    model = BigramLanguageMmodel(vocab_size=vocab_size)
    model = torch.nn.DataParallel(model)
    model = model.to(device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    # batch_size = 32
    for iter in range(max_iters):
        # sample a batch of data
        xb, yb = get_batch("train", train_data, val_data)
        # evaluate the loss
        logits, loss = model(xb, yb)
        # optimizer step
        # loss.backward()
        # for distributed training
        if iter % eval_interval == 0:
            print(f"train loss {loss} and average {loss.mean().item()}")
    # save model
        torch.save(model.cpu().state_dict(), "/opt/ml/model/nanoGPT.pth")
        print('not able to save model')
    # generate not work in distributed mode
    # print(
    #     decode(
    #         model.generate(
    #             idx=torch.zeros((1, 1), dtype=torch.long, device=device),
    #             max_new_tokens=500,
    #         )[0].tolist()
    #     )
    # )
    return None

if __name__ == "__main__":
    # load_model()

Let create a SageMaker training job

from sagemaker.pytorch import PyTorch
from sagemaker import TrainingInput
from sagemaker import Session

# get bucket
session = Session()
bucket = session.default_bucket()

# estimator
estimator = PyTorch(
      'backend': 'gloo',
      'model-type': 'custom'
        # mpirun backend
        "pytorchddp": {"enable": True}

# fit with s3 data

Load Model

SageMaker training job save the model.tar.gz in S3, let load it

import sagemaker
from sagemaker import Session

bucket = Session().default_bucket()
training_job_id = "pytorch-training-2023-12-16-13-20-59-388"


# extract model.tar.gz
!tar -xvf model/model.tar.gz --directory model/

Recreate the model

Then load the weighs into the model

model = BigramLanguageMmodel(vocab_size=65)
model = torch.nn.DataParallel(model)

with open("./model/nanoGPT.pth", "rb") as f:

model = model.to(device)

Finally we can test it

with open("input.txt", 'r', encoding='utf-8') as f:
  text = f.read()

chars = sorted(list(set(text)))
vocab_size =len(chars)

stoi = { ch:i for i,ch in enumerate(chars)}
itos = { i:ch for i,ch in enumerate(chars)}
encode = lambda s: [stoi[c] for c in s] # encode take a string and output list of integer
decode = lambda l: ''.join([itos[i] for i in l]) # decode a list of integer to a string



Andrej Karpathy Let's build GPT: from scratch, in code, spelled out

Andrej Karpathy nanoGPT GitHub

Attention Is All You Need

Natural Language Processing with Transformers, Revised Edition

SageMaker Distributed Training MNIST

Distributed Training Workshop

Distributed Training Workshop GitHub

Data Prallelism Library in SageMaker

The Science Behind Amazon SM Distributed Training Engine

SMDDP Distributed Data Parallel Supported Instance

SageMaker Pytorch MNIST