wehs7661/ensemble_md

Reducing the computational cost for the new implementation of EEXE with MPI-enabled GROMACS

wehs7661 opened this issue · 11 comments

In the commit c661fb8, we have enabled the use of MPI-enabled GROMACS and disabled the use of thread-MPI GROMACS. However, as discussed in issue #10 , in the original implementation, the execution of multiple grompp commands was parallelized by mpi4py using the conditional statement if rank < self.n_sim:, while in the new implementation that allows MPI-enabled GROMACS, the GROMACS tpr files are generated serially using the following lines in the function run_grompp:

# Run the GROMACS grompp commands in parallel
for i in range(self.n_sim):
    print(f'Generating the TPR file for replica {i} ...')
    returncode, stdout, stderr = self.run_gmx_cmd(args_list[i])
    if returncode != 0:
        print(f'Error:\n{stderr}')
        sys.exit(returncode)

I tested the code with a fixed-weight EEXE simulation for the anthracene system with nst_sim=1250 and n_sim on Bridges-2 with 64 cores requested and runtime_args={'-ntomp': '1'} and n_proc=64. As a result, 50 iterations took 327 seconds. This is much longer than the original implementation that used mpi4py to parallelize the GROMACS grompp commands. Specifically, using the new implementation, 20000 iterations would take approximately 327 * 400 seconds, which is around 36 hours, much longer than 13 hours required to finish the same simulation using the original implementation.

In light of this, we should figure out a way to parallelize the GROMACS grompp commands without introducing any noticeable overhead and without using mpi4py (to prevent nested MPI calls). Also, we should try to identify if there is any other source that contributed to higher computational costs in the new implementation of EEXE.

In the commit 344afa6, I replaced the for loop mentioned in the comment above with the lines below, which used ThreadPoolExecutor in concurrent.futures.

# Use ThreadPoolExecutor to run the grompp commands in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=self.n_sim) as executor:
    futures = [executor.submit(self.run_gmx_cmd, args) for args in args_list]

    for i, future in enumerate(futures):
        try:
            returncode, stdout, stderr = future.result()
            if returncode != 0:
                print(f'Error:\n{stderr}')
                sys.exit(returncode)
            else:
                print(f'Generating the TPR file for replica {i} ...')
        except Exception as exc:
            print(f'An error occurred: {exc}')
print('Finished generating all TPR files.')  # Synchronization point

As a result, completing 50 iterations took around 270 seconds, which is around 15% faster than the serial generation of tpr files above, but still much slower than the original implementation. I also tried using ProcessPoolExecutor instead of ThreadPoolExecutor (by simply replacing ThreadPoolExecutor with ProcessPoolExecutor in the code above), but the performance was basically the same. (ProcessPoolExecutor was slightly slower.)

Notably, here is a summary about the two kinds of executor in concurrent.futures (provided by GPT-4):

  • ThreadPoolExecutor: This executor uses threading to execute calls asynchronously. Due to Python's Global Interpreter Lock (GIL), threads in Python are best suited for I/O-bound tasks, where the program spends most of its time waiting for input or output operations to complete.
  • ProcessPoolExecutor: This executor uses multiprocessing to execute calls asynchronously. Multiprocessing creates separate processes which each have their own Python interpreter and memory space, allowing them to execute simultaneously on separate CPUs or cores and bypassing the GIL. This makes multiprocessing best suited for CPU-bound tasks, where the program spends most of its time doing computations.

In the commit 22528e5, I tried the following, which uses subprocess to spawn process in parallel:

    def run_grompp(self, n, swap_pattern):
        """
        Prepares TPR files for the simulation ensemble using the GROMACS :code:`grompp` command.

        Parameters
        ----------
        n : int
            The iteration index (starting from 0).
        swap_pattern : list
            A list generated by :obj:`.get_swapping_pattern`. It represents how the replicas should be swapped.
        """
        commands = []
        for i in range(self.n_sim):
            arguments = []

            # See if there is a need to use mpirun or mpiexec
            arguments.extend([self.mpi_cli, self.mpi_arg, '1', self.gmx_executable, 'grompp'])

            # Input files
            mdp = f"sim_{i}/iteration_{n}/{self.mdp.split('/')[-1]}"
            if n == 0:
                gro = f"{self.gro}"
            else:
                gro = f"sim_{swap_pattern[i]}/iteration_{n-1}/confout.gro"  # This effectively swap out GRO files
            top = f"{self.top}"

            # Add input file arguments
            arguments.extend(["-f", mdp, "-c", gro, "-p", top])

            # Add output file arguments
            arguments.extend([
                "-o", f"sim_{i}/iteration_{n}/sys_EE.tpr",
                "-po", f"sim_{i}/iteration_{n}/mdout.mdp"
            ])

            # Add additional arguments if any
            if self.grompp_args is not None:
                # Turn the dictionary into a list with the keys alternating with values
                add_args = [elem for pair in self.grompp_args.items() for elem in pair]
                arguments.extend(add_args)

            # Construct the command string
            command = " ".join(arguments)
            commands.append(command)

        # Spawning processes in parallel
        processes = []
        for command in commands:
            process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            processes.append(process)

        # Wait for all processes to finish
        for process in processes:
            process.wait()

        print('Finished generating all TPR files')  # Synchronization point

I also tried the following in the commit bb2d53c:

from subprocess import Popen, PIPE

def run_grompp(self, n, swap_pattern):
    # ... (rest of the function) ...

    # Run the GROMACS grompp commands in parallel
    processes = [Popen(args, stdout=PIPE, stderr=PIPE) for args in args_list]
    for i, process in enumerate(processes):
        stdout, stderr = process.communicate()
        if process.returncode != 0:
            print(f'Error on replica {i}:\n{stderr.decode()}')
            sys.exit(process.returncode)

As a result, both of them took around 330 seconds.

In the commit a2ab912, I tried using multiprocessing.Pool:

from multiprocessing import Pool

def run_grompp(self, n, swap_pattern):
    # ... (rest of the function) ...

    # Run the GROMACS grompp commands in parallel
    with Pool(self.n_sim) as pool:
        results = pool.map(self.run_gmx_cmd, args_list)

    for i, (returncode, stdout, stderr) in enumerate(results):
        if returncode != 0:
            print(f'Error on replica {i}:\n{stderr}')
            sys.exit(returncode)

As a result, it took around 313 seconds to finish 50 iterations.

In commit 581b081, I tried using asyncio to parallelize the grompp commands:

import asyncio

async def async_run_gmx_cmd(self, args):
    process = await asyncio.create_subprocess_exec(*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE)
    stdout, stderr = await process.communicate()
    return process.returncode, stdout, stderr

def run_grompp(self, n, swap_pattern):
    # ... (rest of the function) ...

    # Run the GROMACS grompp commands in parallel
    loop = asyncio.get_event_loop()
    tasks = [self.async_run_gmx_cmd(args) for args in args_list]
    results = loop.run_until_complete(asyncio.gather(*tasks))

    for i, (returncode, stdout, stderr) in enumerate(results):
        if returncode != 0:
            print(f'Error on replica {i}:\n{stderr.decode()}')
            sys.exit(returncode)

As a result, it took 305 seconds to finish all 50 iterations.

In the commit 1340f40, I tried joblib:

from joblib import Parallel, delayed

def run_grompp(self, n, swap_pattern):
    # ... (rest of the function) ...

    # Run the GROMACS grompp commands in parallel
    results = Parallel(n_jobs=self.n_sim)(delayed(self.run_gmx_cmd)(args) for args in args_list)

    for i, (returncode, stdout, stderr) in enumerate(results):
        if returncode != 0:
            print(f'Error on replica {i}:\n{stderr}')
            sys.exit(returncode)

As a result, it took around 414 seconds to finish 50 iterations, which is the slowest amount all options. This is not surprising, though. See the note (summarized from GPT-4) in the next comment.

  • multiprocessing.Pool: This module is ideal for CPU-bound tasks because it takes advantage of multiple CPU cores and bypasses the Global Interpreter Lock (GIL). It creates a pool of worker processes and enables one to parallelize the execution of a function across multiple input values. In our case, this would be the generation and execution of the GROMACS grompp commands. The Pool class also provides methods to control the granularity of parallelism and handle any exceptions raised by the worker process.
  • concurrent.futures.ProcessPoolExecutor: Similar to multiprocessing.Pool in that it is also suitable for CPU-bound tasks. It is a slightly higher-level interface and works better with the 'with' context manager, which can automatically manage resources. As of Python 3.7, concurrent.futures is generally recommended over multiprocessing for new code because it has a simpler, more Pythonic API.
  • concurrent.futures.ThreadPoolExecutor: This method is suitable for I/O-bound tasks and can improve performance when the program is network-limited or file I/O-limited, rather than being CPU-limited. However, Python's Global Interpreter Lock (GIL) may limit the effectiveness of threading for CPU-bound tasks.
  • subprocess: The subprocess module allows one to spawn new processes, connect to their input/output/error pipes, and obtain their return codes. However, it's relatively low-level, and managing multiple subprocesses (especially error handling) can be tricky. Also, inter-process communication can be slower than inter-thread communication.
  • asyncio: This is an asynchronous I/O framework using coroutines and is excellent for high-performance network and disk I/O. But it's unlikely to speed up our use case, since the grompp command is likely CPU-bound.
  • joblib: This library is mainly developed for machine learning in Python and supports pipelining and provides an easy-to-use interface for parallelizing tasks. However, it's less efficient for tasks that are short-lived and better suited to long-running tasks.

I additionally tried using subprocess.Popen as follows, which is very similar to some of the options used above

from subprocess import Popen, DEVNULL

def run_grompp(self, n, swap_pattern):
    # ... (rest of the function) ...

    # Run the GROMACS grompp commands in parallel
    processes = [Popen(args, stdout=DEVNULL, stderr=DEVNULL) for args in args_list]
    for i, process in enumerate(processes):
        process.wait()
        if process.returncode != 0:
            print(f'Error on replica {i}:\nProcess returned non-zero exit code.')
            sys.exit(process.returncode)

It took around 270 seconds to finish 50 iterations with this approach. This was attempted in commit 1fc0463.

Okay, according to more tests that I performed later, I found that the issue might be that the start time of the mdrun command with the -multidir flag is longer, which we will investigate later.

Here is a summary of the results from the additional tests, which all completed 50 iterations of 1250-step EXE simulations of the anthracene system. All the tests were based on the commit 1fc0463.

  • A line-by-line profiling test for the main function in run_EEXE.py showed that 98.8% of the computational cost came from the function run_EEXE,
  • A further breakdown of the computational cost of the function run_EEXE showed that 15.5% of its computational cost came from run_grompp and the other 84.5% came from run_mdrun.
  • Both functions run_grompp and run_mdrun relies on the use of the function run_gmx_cmd, which launches a GROMACS command using subprocess.run, where 100% of its computational cost comes from.
  • In another test, it was shown that for run_mdrun, 100% of its computational cost came from run_gmx_cmd. For run_grompp, 98.3% of its computational cost came from process.wait(), and only 1.7% came from subprocess.Popen.
  • Some notes about subprocess.Popen and process.wait (summarized from GPT-4):
    • The Popen constructor from the subprocess module in Python launches a new process, but it does not wait for the process to complete. Popen immediately returns a Popen object that one can use to interact with the process, while the actual command starts executing in the background. Therefore, the time spent in the Popen call itself is typically very short, because it's just starting a new process and not waiting for it to complete.
    • The call to wait() blocks (i.e., pauses your Python script) until the process is done. If the script reaches wait() while the process is still running, the script will stop and wait. This is why most of the time is spent in wait(): in our case, it includes the time it takes for the GROMACS grompp command to run.
  • I also timed the execution of run_grompp in a test completing 50 iterations, and the wall time for generating tpr files ranged from 0.910 to 1.076 seconds, with a mean of 0.992 seconds.
  • I also tried wrapping process.wait() in run_grompp with time.time():
    for i, process in enumerate(processes):
        t1 = time.time()
        process.wait()
        t2 = time.time()
        print('wait time: ', t2 - t1)
    
    For all 50 iterations, the first out of 4 printed wall times always accounted for >99.9% of the wall time of completing 4 grompp commands in parallel (the wall time of finishing run_grompp). This shows that the 4 grompp commands were launched nearly simultaneously and ended nearly at the same time.
  • In one test, I timed the function run_mdrun in a test completing 50 iterations, and the wall time ranged from 4.055 to 4.922 seconds, with a mean of 4.309 seconds. However, from the 200 log files generated in the test, the wall time for finishing 1 mdrun command ranged from 1.119 to 1.821 seconds, with a mean of 1.285 seconds. This shows that the flag -multidir is actually not efficient in parallelizing simulations, at least in our case. Still, it is faster than just running simulations in parallel, as the mean of the summed wall times (of 4 replicas) is 5.140 seconds, which is longer than 4.309 seconds.
  • I later compared the timestamp when the code reaches run_mdrun and the timestep in the log file indicating the start of the simulation. As a result, the difference between the two was always at least a few seconds. Assuming that there is no significant delay between the invocation of subprocess.run() in Python and the actual execution of the mdrun command in the operating system, this difference likely comes from the start time. (As can be seen from md.cpp, the timestamp printed in the log file "Started mdrun on rank ..." is not exactly the timestep when the mdrun command is executed, but when the simulation starts.
  • The wall time for finishing 50 iterations ranged from 270 seconds up to almost 400 seconds, so most of the approaches attempted above are likely to have similar performances.

I created a PR corresponding to this issue. The following work/discussions will be logged in the PR.

We later confirmed that the start time for mdrun commands when -multidir was used could be much longer than the case where it is not used. This can be easily verified with the following lines:

echo "$(date +"%T")"
mpirun -np 64 gmx_mpi mdrun -s sys_EE.tpr -ntomp 1 -multidir sim_0/iteration_0 sim_1/iteration_0 sim_2/iteration_0 sim_3/iteration_0
echo "$(date +"%T")"

Specifically, the difference between the timestamps can be several seconds longer than the wall time shown at the end of the log time, which does not include the start time.

Without using the flag -multdir, another possible way to parallelize MPI-enabled GROMACS mdrun commands is to use the same way we parallelized the GROMACS grompp commands as discussed above. However, preliminary tests showed that the computational cost was still quite high. This is probably because that launching multiple commands like mpirun -np 16 gmx_mpi mdrun ... simultaneously does not necessarily guarantee that all these commands use a different set of processors, leading to suboptimal performance.

Also, all the approaches proposed above for parallelizing GROMACS commands do not work for multiple nodes, in which case using MPI-enabled GROMACS does not really bring any advantages. At the end of the day, the goal is to run EEXE on multiple nodes using MPI-enabled GROMACS, if possible.

Given that the current implementation that allows MPI-enabled GROMACS (1) incurs higher computational cost than the original implementation (2) and is not closer to the scenario where the EEXE might be the most useful, we decided to fall back to the original implementation of EEXE that ONLY works with thread-MPI GROMACS. Changes necessary for this decision have been made mainly in the commit 4c73f03.

I'm closing this issue since this is no longer relevant in the short term.

Here is a summary of ideas and efforts relevant to the issue.

  • Our ultimate goal is to enable EEXE simulations across multiple nodes using MPI-enabled GROMACS.
  • To work with multiple nodes, the most straightforward and seemingly the only solution is to use mpi4py to launch subprocess calls. Python libraries attempted above, including concurrent, multiprocessing, subprocess, asyncio, joblib do not seem to work with multiple nodes, so approaches proposed above to parallelize GROMACS commands would not work in our case. Also, we will not rely on gmxapi or scale-ms in the near future unless they are shown to have minimal overhead for parallelizing simulations.
  • To use mpi4py to launch subprocess calls of MPI-enabled GROMACS commands, we will need to figure out how to avoid nested MPI calls, which is the current biggest bottleneck. If there is any way to not call MPI.Init() like mpi4py does, we might be able to use the flag -multidir to circumvent the problem, although the start time for mdrun commands with -multidir could introduce overhead.

I'll re-open the issue if new possible workarounds are proposed.