COS 598D Project: Distributed Training of Language Models

Overview

This assignment is designed to familiarize you with the basics of training language models with a toy example on a small scale. You will first get hands-on experience with fine-tuning pre-trained language models like BERT (implemented in PyTorch and Huggingface Transformers) on a single machine. Then, you will implement data parallelism, the most common paradigm of distributed training, using primitives from PyTorch Distributed and perform distributed training on multiple machines. You will understand the tradeoffs of different approaches of performing distributed training.

Learning Outcomes

  • Load a pretrained language model from Huggingface and fine-tune it on GLUE datasets
  • Write distributed training applications with PyTorch distributed
  • Understand the trade-off between different methods of performing distributed training
  • Describe how ML frameworks (PyTorch) interact with lower-level communication frameworks (e.g. OpenMPI, Gloo, and NCCL)

Environment Setup

You may choose to either use GPUs or CPUs for this project. You are required to use up to 4 nodes in this assignment, and the nodes should be able to communicate with each other through a network interface. The nodes should have at least ~12 GB of RAM (in case of CPU training) or GPU memory (in case of GPU training) and at least ~10G of disk space.

The following setup process caters to CPU-only environments. Note that this assignment is best done if you have bare-metal access to your compute nodes (i.e. you can access the nodes using a terminal not through a Slurm scheduler). You can get access to CPU nodes on CloudLab -- for instructions on accessing CloudLab, see cloudlab.md.

  • Create a fork of this repository and clone your own fork on all machines
  • Install software dependencies via apt: sudo apt-get update ; sudo apt-get install htop dstat python3-pip
  • Download the RTE dataset, one of the datasets within GLUE. You are only required to fine-tune on RTE in this assignment. This command downloads all datasets within GLUE: cd $HOME ; mkdir glue_data ; cd cos598d ; python3 download_glue_data.py --data_dir $HOME/glue_data
  • Optional: create a virtual environment (conda/venv) to install the following dependencies
  • Install CPU-only PyTorch: pip install torch --index-url https://download.pytorch.org/whl/cpu. If you are using GPUs, install the appropriate PyTorch here.
  • Install dependencies: pip install numpy scipy scikit-learn tqdm pytorch_transformers

Part 1: Fine-tuning BERT on GLUE Datasets on a Single Node

We have provided a base script (run_glue_skeleton.py) for you to get started, which provides some model/dataset setup and training setup to fine-tune BERT-base on the RTE dataset. The base script bakes in a handful of optimizations such as mixed-precision training and gradient accumulation -- we will not be needing them, and you can ignore the related code. We also provide utils_glue.py which includes some helper functions.

Here is a command you can use to run run_glue.py:

export GLUE_DIR=$HOME/glue_data
export TASK_NAME=RTE

python3 run_glue.py \
  --model_type bert \
  --model_name_or_path bert-base-cased \
  --task_name $TASK_NAME \
  --do_train \
  --do_eval \
  --data_dir $GLUE_DIR/$TASK_NAME \
  --max_seq_length 128 \
  --per_gpu_train_batch_size 64 \
  --learning_rate 2e-5 \
  --num_train_epochs 3 \
  --output_dir /tmp/$TASK_NAME/ \
  --overwrite_output_dir

Task 1: Load the pretrained BERT-base model from Huggingface and fill in the standard training loop of a minibatch – forward pass (already implemented), backward pass, loss computation (already implemented), and parameter updates (via optimizer step). Record the loss values of the first five minibatches by printing the loss value after every iteration. Afterward, run training for 3 epochs (an epoch is a complete pass over a dataset -- when doing fine-tuning, we typically only need a small number of epochs) with batch size 64 and the default hyperparameters. All required code changes are marked with TODO(cos598d) comments.

There are several examples for training that describe these four steps. Some good resources include the PyTorch examples repository and the Pytorch tutorials. This script is also a starting point for later parts of the assignment. Familiarize yourself with the script and run training on a single machine.

Part 2: Distributed Data Parallel Training

Next, you will modify the script used in Part 1 to enable distributed data parallel training. There are primarily two ways distributed training is performed: i) Data Parallel, ii) Model Parallel. In the case of Data parallel, each of the participating workers trains the same network, but on different data points from the dataset. After each iteration (forward and backward pass), the workers average their local gradients to come up with a single update. In model parallel training, the model is partitioned among a number of workers. Each worker performs training on part of the model and sends its output to the worker which has the next partition during the forward pass and vice-versa in the backward pass. Model parallel is usually used when the size of the network is very large and doesn’t fit on a single worker. In this assignment, we solely focus on Data Parallel Training. For more information about parallelism in ML, see the FAQs. For data parallel training, you will need to partition the data among other nodes. Look at the FAQs to find details on how to partition the data.

To understand more about the communication primitives in ML, please refer to these two articles: PyTorch documentation, NCCL documentation. We will be using gather, scatter, and all_reduce in this assignment.

Part 2(a): Gradient Synchronization with gather and scatter

PyTorch comes with the gloo communication backend built-in. On CPU nodes, we will use this to implement gradient aggregation using the gather and scatter calls. On GPU nodes, use the nccl backend.

We will do the following:

  1. Set up PyTorch in distributed mode using PyTorch Distributed. Initialize the distributed environment using init_process_group(), which only needs to be invoked once. You are encouraged to write a small test script to make sure you can initialize a process group on four nodes before proceeding. For details of the initialization process, see the FAQs.
  2. Next, to perform gradient aggregation, you will need to read the gradients after the backward pass. PyTorch performs gradient computation using auto grad when you call .backward on a computation graph. The gradient is stored in the .grad attribute of the parameters. The parameters can be accessed using APIs like model.parameters() or model.named_parameters().
  3. Finally, to perform gradient aggregation, you will need to use gather and scatter communication collectives. Specifically, worker 0 (with rank 0) in the group will gather the gradients from all the participating workers, perform element-wise averaging, and then scatter the mean vector to all workers. The workers update the grad variable with the received vector and then continue training.

Task 2(a): Implement gradient synchronization using gather and scatter in gloo or nccl. Verify that you are using the same total batch size, where total batch size = batch size on one worker * number of workers. With the same total batch size and the same seed, you should get similar loss values as in the single-node training case. Remember that you trained with a total batch size of 64 in Task 1.

Part 2(b): Gradient Synchronization with all_reduce

Ring AllReduce is an extremely scalable technique for performing gradient synchronization. Read here for more intuition and visualization of Ring Allreduce (and how it has been applied to distributed ML). Instead of using the gather and scatter collectives separately, you will next use the built-in all_reduce collective to sync gradients among different nodes. Again, read the gradients after the backward pass and perform all_reduce on the gradients. Note that the PyTorch all_reduce call does not have an "average" mode, so you will need to use the sum operation and then get the average on each node by dividing the sum by the number of workers. After averaging, update the gradients of the model as in the previous part.

Task 2(b): Implement gradient synchronization using the all_reduce collective in gloo or nccl. In this case, if you have set the same random seed while using the same total batch size, you should see the same loss values as in Task 2(a).

Part 3: Distributed Data Parallel Training with PyTorch DistributedDataParallel

Now, instead of writing your own gradient synchronization, use the distributed functionality provided by PyTorch.

Task 3: Register your model with distributed data parallel and perform distributed training. Unlike in Part 2, you will not need to read the gradients for each layer as DistributedDataParallel performs these steps automatically. For more details, read here.

[Optional] Part 4: Benchmarking Data Parallel Training

In this optional section, your task is to benchmark and profile the communication aspect of data parallel training and observe how application-level hyperparameters (e.g. batch size and precision level) affect system performance. In your code for task 2(b), before you perform an all_reduce, record the shape/dimension and the data type (e.g. float32) of the tensor being communicated between nodes. Given this information, you should be able to calculate the size of the tensor in bytes (size_in_bytes = num_elements * element_size). Next, record the total amount of network traffic over an epoch. The expected amount of network traffic should equal num_iterations * traffic_per_iteration due to the iterative nature of ML training. You can do this by using Linux CLI tools like dstat to generate network traffic statistics and export them into a csv file. Other similar tools include tshark, tcpdump, etc.

Task 4: Adjust the batch size (e.g. reduce it from 64 to 32 and 16) and observe how the network traffic (aggregated over an epoch) and tensor size (for every all_reduce) changes. Compare your observed traffic with the expected amount of traffic. Reason about the difference (or the lack of difference) of training runs with different batch sizes.

Deliverables

A brief report in PDF format (filename: $NetID$_$firstname$_$lastname$.pdf) and the code for each task. Please compress your source code into a .zip file and upload it to Gradescope.

In the report, include the following content:

  • Run task 1 and report the evaluation metric after every epoch.
  • Run each task for 40 iterations (40 minibatches of data). Discard the timings of the first iteration and report the average time per iteration for the remaining iterations for each task (2(a), 2(b), and 3). To time your code, see this.
  • Reason about the difference (or the lack of difference) among different setups. Feel free to refer to the PyTorch Distributed [VLDB '18] paper for more context.
  • Comment on the scalability of distributed ML based on your results.
  • [Optional] If you chose to do task 4, include a comparison of the network traffic in different setups and your reasoning of the differences.
  • Provide any implementation details.
  • Code for each task should be in different directories: task2a, task2b, and task3.
  • All your code that requires PyTorch distributed should be runnable using the following command:
python run_glue.py [other input args] --master_ip $ip_address$ --master_port $port$ --world_size 4 --local_rank $rank$

Common FAQs and Resources

  • What is BERT? BERT, which stands for Bidirectional Encoder Representations from Transformers, is an NLP model introduced by Google in 2018. It is an encoder-only model, and it is capable of various NLP tasks, such as question answering, sentiment analysis, and language translation. For more information, check this out: A Visual Guide to Using BERT for the First Time . If you want more insights into which parameters we are doing gradient synchronization on, you can use a model profiler (e.g. PyTorch Profiler, DeepSpeed Profiler) to inspect the model. Some profilers may require a GPU to run.

  • What is GLUE? The General Language Understanding Evaluation benchmark (GLUE) is a collection of datasets used for training, evaluating, and analyzing NLP models relative to one another. The collection consists of nine difficult and diverse task datasets designed to test a model’s language understanding. For more information, check this out: GLUE Explained: Understanding BERT Through Benchmarks

  • What is fine-tuning? Fine-tuning in the context of NLP refers to the process of taking a pre-trained model (trained on a huge text corpus) and further training it on a specific dataset to tailor it for a particular task.

  • Testing programs: We suggest you write small programs to test the functionality of communication collectives at first. For example, create a tensor and send it to another node. Next, try to perform all reduce on it. This will help you get comfortable with communication collectives.

  • Setting up distributed init: In this setup we will use the TCP init method “tcp://10.10.1.1:6585”. Instead of a shared file system, we want to use TCP to communicate. In this example, we use 10.10.1.1 as the IP address for rank 0. You can use ifconfig to find the IP address of your nodes. On CloudLab, please use the experimental network, which listens on 10.10.1.* interfaces (sending a huge amount of traffic through the other network interfaces may lead to your reservation getting terminated by the CloudLab admins). You can double-check network connectivity by pinging a node from another. The port has to be a non-privileged port, i.e. greater than 1023.

  • Data Partitioning: In the case of data parallel training, the workers train on non-overlapping data partitions. You will use the distributed sampler to distribute the data among workers. For more details, look at torch.utils.data.distributed_sampler

  • Parallelism in ML: HuggingFace has a nice writeup that summarizes the common parallelism paradigms in (large-scale) ML training.

Acknowledgements