Distributed Learning Anomaly

This repository is to record history about tracking distributed deep learning locality anomalies.

Quick Link

1-intel-mpi

NOTE

If docker run with priviledge and infiniband, the communication became slower.
Why? Maybe because there is no proper infiniband driver installed.

1. What is Distributed Learning?

Deep Neural Networks (DNNs) have dramatically improved the state-of-the-art for many problems
that machine learning (ML) and artificial intelligence (A.I.) community dealt with for decades,
including speech recognition, machine translaton, object identification, ...

-- Salem Alqahtani, Murat Demirbas, “Performance Analysis and Comparison of Distributed Machine Learning Systems,” ICCCN, 2019.

This DNN, Deep Learning procedure is very hungry for large data.
And of course, dealing with large data requires huge computing powers.
Therefore, to derive larger computing powers to single DNN, Distributed Learning has arrived.

Distributed Learning is where multiple workers work together, and
each worker run learning on chunks obtained by data parallelism, to obtain
local gradient. These local gradients are gathered by sum or avgerage to produce
global gradient.

2. Is it strictly epoch standard?

Log session hooks are called after run. But this run is not as same as epoch.
Therefore, It cannot be explicitly said that even if after_run detects that epoch value>=10,
It is not exactly epoch==10, but something like 10.02.

However, it looks like there is no need to worry about because between every run,
Epoch is defined by global step, global batch size, and number of records,
therefore, every epoch value after_run prints should be equal.
However just in case, record start epoch value and start time, and when epoch value is equal or bigger than the specific value,
print differences in both epoch and time, and epoch difference divided by time difference.

3. Configure tensorflow not to save logs in /tmp

There is a suspicision that writing files of big size result in anomalies.

Even though log_dir is not set, the checkpoints are recorded while training.
Inside the python script /workspace/nvidia-examples/cnn/nvutils/runner.py, within the train method,
set classifier's config related member variables to None.

Also, tensorflow uses a temporary model directory to write logs.
This can be avoided by editing file:

/usr/local/lib/python3.6/dist-packages/tensorflow_estimator/python/estimator/estimator.py


At line number 1821, edit lines like this:

  elif getattr(config, 'model_dir', None) is None:
    pass
    #model_dir = tempfile.mkdtemp()
    #logging.warning('Using temporary folder as model directory: %s', model_dir)
    #config = run_config.RunConfig.replace(config, model_dir=model_dir)

After this edit, I observed that temporary folder is not created inside /tmp. However, will it really not write files by just letting the model_dir to be None?

4. Eliminate network part

/usr/local/lib/python3.6/dist-packages/horovod/tensorflow/__init__.py
The above python script implements how ring-allreduce works.
For example, see the line 311.

5. baidu-allreduce

Found a interesting repository on Github. The link is here.
The repository is made by baidu, the company which implemented ring-allreduce algorithm.
The repository is written in C++, and it demonstrates how the algorithm works in a brief.

I have run the code with two locality environments, 4-2-2, and 4-3-1.
The three nodes are all in the same rack, so they share the same farthest network distance.
Also, The node with the most processes has 4, which means they will have almost equal loads.

image

Since ring-allreduce is irrelvant to number of nodes or process distribution across nodes, the result is quite astonishing.
It was seen that the node with only one process is the source of the problem.
The implementation of scatter-reduce (first phase out of two phases of ring-allreduce) is as follows:

for (int i = 0; i < size - 1; i++) {
  int recv_chunk = (rank - i - 1 + size) % size;
  int send_chunk = (rank - i + size) % size;
  float* segment_send = &(output[segment_ends[send_chunk] -
                              segment_sizes[send_chunk]]);


  MPI_Irecv(buffer, segment_sizes[recv_chunk],
          datatype, recv_from, 0, MPI_COMM_WORLD, &recv_req);


  MPI_Send(segment_send, segment_sizes[send_chunk],
          MPI_FLOAT, send_to, 0, MPI_COMM_WORLD);

  float *segment_update = &(output[segment_ends[recv_chunk] -
                                    segment_sizes[recv_chunk]]);

  // Wait for recv to complete before reduction
  MPI_Wait(&recv_req, &recv_status);

  reduce(segment_update, buffer, segment_sizes[recv_chunk]);

}

It is a mere of demonstration of ring-allreduce,
and does not know whether there are changes applied to horovod, implementation,
but horovod mentioned they replaced the implementation with NCCL, and made several API improvements.
Take a look at the section 4 Installing horovod, in this report written by horovod team.
Right now, I could not find any actual implemented code with MPI APIs inside horovod repository.

However, baidu also has a reporitory built with tensorflow and ring-allreduce.
The ring-allreduce implementation is almost identical. I think I can make use of it.

The issue might come from that send and recv done by a thread are not concurrent.
Without supporting hardwares, MPI non-blocking APIs do not receive nor send data background.
It is like TCP protocol, where two processes must handshake to transport data.
Same goes with MPI. MPI APIs need CPU's attention.
Therefore, even if a process call MPI_Irecvand then MPI_Send and wait for the MPI_Irecv,
it does each send and receive sequentially.

Even if it might be true for MPI, I do not know if same goes with other APIs, such as NCCL.
I need to take a look at those other APIs.

6. How to Resolve

There are two resolutions came to mind:

  1. Implement asynchronous, background API.
  2. Run send and receive functions on separate threads.
    The former one is of course a lot more diffcult than the latter.

Run send and receive functions on separate threads.

I can create two child threads, and have them do send or receive concurrently.
Actually, there is no need of two child threads, what I mean is create
one child thread and have it run send or receive, then the main thread
can focus on doing the remaining one.

But the thing is whether child threads can call MPI APIs.
Hopefully, MPI does support, but the document says that multiple thread support is only lightly tested.
Check out the link.

Also, MPI_THREAD_MULTIPLE support should have been configured.
Run the command and see if the configured OPENMPI supports it:

ompi_info | grep -i thread

However this is not the possible solution. I was wrong.
Internally, MPI does non blocking communications while doing blocking communications.

7. Why?

Here I list possible factors that might be the root of this anomaly.

1. NUMA

Since the environments are numa architecture, it could cause this kind of anomalies.
To prevent this, we have to check that,
By executing numactl --show, the policy is default or bind.
default is allocate from the same node first, and bind is allocate from the same node only.

Also, check processes distribution when starting mpi job.
Check out --bind-to or --map-by.

2. Rank Shared by Threads

This is the most feasible factor among candidates.
There are so many papers which describes the speed limit caused by the ranks shared by threads.
The rank is something like a resource, including network resource a shared object, and a locking unit
to ensure the FIFO order of multiple MPI calls called by multiple threads.
Thus, without independent contexts and ranks, the threads must propagate with mutual exclusion.

image

The solution for the problem proposed is creating endpoints, which is a sort of a rank.
By assigning a endpoint to each thread, the threads can propagate without mutual exclusion.

However, even if the first propsal was made as MPI 3 standard, in 2015,
it is not yet standardized until now, march 2021.

Luckily, Intel MPI has mutliple endpoints support since 2019, with psm2 library.
I have to look into it.

image

1. Intel MPI

To download and install Intel MPI, execute:

wget https://registrationcenter-download.intel.com/akdlm/irc_nas/17427/l_HPCKit_p_2021.1.0.2684_offline.sh

and then,

bash l_HPCKit_p_2021.1.0.2684_offline.sh

The MPI will be installed at /opt/intel/oneapi/mpi/2021.1.1. But before move on, I need to set environment variables.

source /opt/intel/oneapi/setvars.sh

Also, I need to change library kind to release_mt because multiple endpoint support is only available with that library.

source /opt/intel/oneapi/mpi/2021.1.1/env/vars.sh -i_mpi_library_kind=release_mt

Change I_MPI_THREAD_SPLIT to activate multiple endpoints.

export I_MPI_THREAD_SPLIT=1

The default maximum number of determined by the omp_get_max_threads().
If that is too large, it might fail to run the MPI program.
Override the default value with, and change the 2 to the number of threads per rank:

export I_MPI_THREAD_MAX=2

mlx provider does not support multi-EP feature. See here.
Use TCP instead. I don't still get it why PSM2 emits errors.

export I_MPI_OFI_PROVIDER=TCP

The Intel MPI compiler name for C++ is `mpiicpc`.
And the mpirun is just `mpirun`.

For mpirun problems, seems like openmpi and Intel MPI conflict each other.
Watch this Intel MPI link.

If debugging output is needed, set this:

export I_MPI_DEBUG=5

Then run with the following command:

mpirun -n 6 -machine m42 -iface enp216s0 ./allreduce-test cpu

Rembember to specify network interface otherwise it will fail to connect to remote machines.

I would better enable the priviledged flag when docker running, to give infiniband awareness to the docker.

8. Horovod

In horovod, The actual MPI_Allreduce happens in

horovod/horovod/common/ops/mpi_operations.cc