materialsproject/reaction-network

ray cannot parallelise the job among mutiple CPUs on HPC cluster

jx-fan opened this issue · 12 comments

Hi Matt,

The workflow I have tested on our HPC cluster is essentially the same as you showed in the demo notebook, where I first query a list of ComputedStructureEntry from MP and then try to find (rn.find_pathways) and solve the paths (PathwaySolver) in this chemsys.

What I have observed from my test is that the walltimes of finishing the identical task remain roughly the same no matter how many CPU resources are requested via the scheduler (PBS in our case) in each job.

$ grep -E "NCPUs Requested|Walltime Used" jobscript_*.o*
jobscript_48cpu.o40992939:   NCPUs Requested:    48                     NCPUs Used: 48
jobscript_48cpu.o40992939:   Walltime requested: 00:30:00            Walltime Used: 00:02:10
jobscript_4cpu.o41017010:   NCPUs Requested:    4                      NCPUs Used: 4
jobscript_4cpu.o41017010:   Walltime requested: 00:30:00            Walltime Used: 00:02:28
jobscript_8cpu.o40992505:   NCPUs Requested:    8                      NCPUs Used: 8
jobscript_8cpu.o40992505:   Walltime requested: 00:30:00            Walltime Used: 00:02:07

Therefore I suspected this series of tests were only run on a single CPU. However in the job running log, it outputs the following message:

Ray is not initialized. Trying with environment variables!
Could not identify existing Ray instance. Creating a new one...
[{'NodeID': 'd6984294b967643fb7492da2ea7a714f04f9f58ba14c101e0e7d26d0', 'Alive': True, 'NodeManagerAddress': '10.6.1.20', 'NodeManagerHostname': 'gadi-cpu-clx-0020.gadi.nci.org.au', 'NodeManagerPort': 36315, 'ObjectManagerPort': 34555, 'ObjectStoreSocketName': '/jobfs/40992939.gadi-pbs/ray/session_2022-05-23_12-05-33_014570_1927314/sockets/plasma_store', 'RayletSocketName': '/jobfs/40992939.gadi-pbs/ray/session_2022-05-23_12-05-33_014570_1927314/sockets/raylet', 'MetricsExportPort': 58565, 'alive': True, 'Resources': {'memory': 121874636186.0, 'CPU': 96.0, 'node:10.6.1.20': 1.0, 'object_store_memory': 56517701222.0}}]

which seems to be able to acquire the node information and initialise ray as on my local PC.

We did compile graphic-tool ourselves with the help from sys admin instead of using conda. It is mainly due to the restriction of number of files we have on our supercomputer which would've been easily exceeded via conda install.

Please let me know if you need more information to identify the problem here and thank you very much for your consideration.

Best Regards,
Jiaxin

The python script for testing is shown below:

import json
from rxn_network.enumerators.basic import BasicEnumerator
from rxn_network.costs.softplus import Softplus
from rxn_network.entries.entry_set import GibbsEntrySet
from rxn_network.network.network import ReactionNetwork
from rxn_network.reactions.computed import ComputedReaction
from rxn_network.pathways.solver import PathwaySolver
from pymatgen.entries.computed_entries import ComputedStructureEntry


# Load ComputedStructureEntry
print("Load data...")
with open("computed_str_entries_Mn_O_N_H_2021_11_10.json","r") as f:
    entries = json.load(f)

# Build reaction network
temp = 300+273 # in Kelvin
entries = [ComputedStructureEntry.from_dict(e) for e in entries]
entry_set = GibbsEntrySet.from_entries(entries, temp, include_nist_data=True)
filtered_entry_set = entry_set.filter_by_stability(0.01)
filtered_entry_set.add(entry_set.get_min_entry_by_formula("H3N")) # make sure H3N in ReactionNetwork
filtered_entry_set.add(entry_set.get_min_entry_by_formula("Mn4N")) # make sure Mn4N in ReactionNetwork
be = BasicEnumerator()
cf = Softplus(temp)
rn = ReactionNetwork(filtered_entry_set, [be], cf)
rn.build()

# Find pathways
rn.set_precursors([entry_set.get_min_entry_by_formula("Mn4N"),
                   entry_set.get_min_entry_by_formula("H2O")
                  ])
paths = rn.find_pathways(["H3N"])
product_entries = []
for i in ["H3N", "MnO", "H2"]:
    product_entries.append(entry_set.get_min_entry_by_formula(i))
net_rxn = ComputedReaction.balance(rn.precursors, product_entries)

# Solve paths
ps = PathwaySolver(rn.entries, paths, cf)
balanced_paths = ps.solve(net_rxn, max_num_combos=4,
                          intermediate_rxn_energy_cutoff=0.0,
                          use_minimize_enumerator=True,
                          filter_interdependent=True)

# Print out paths
for idx, path in enumerate(balanced_paths):
    print(f"Path {idx+1}", "\n")
    print(path)
    print("\n")

Hi @jx-fan,

Thank you for raising an issue and providing your input script -- this is very helpful! Sorry to hear about your struggles with speeding up the calculations.

I can confirm that I can successfully run your code on my laptop and the total time is ~ 2mins for me as well.

I don't think this is necessarily a problem with your Ray installation. In your particular case, Ray is only being used for a very small part of the total operations -- specifically, the reaction enumeration which is carried out when you call rn.build(). On my laptop, this step takes about 3 seconds, so not much to parallelize here. Ray becomes more helpful for me when I am doing very large systems (i.e., reaction enumeration in a system with 20-30 elements). Hence I am not surprised that you are only seeing a small speedup, as well as diminishing return as you add CPUs. (2:28 --> 2:07 --> 2:10 for 4, 8, 48 CPUs respectively)

There is one other place where parallelization has been added at the moment. That is at end when you call ps.solve(). This function uses numba to process chunks of arrays in parallel using their JIT compilation. I am surprised, however, that you don't see faster performance with more CPUs here.

I will look into whether there is an issue with speedup from implementing numba here and get back to you. Hopefully this answer helps in the meantime. Good luck with your calculations!

FYI -- I am going to release a new version of the package today. I am not sure if it will necessarily change anything, but I did add a mode for turning on/off parallelization on the enumerator. You just have to initialize as BasicEnumerator(parallel=False)

Hi @jx-fan,

Thank you for raising an issue and providing your input script -- this is very helpful! Sorry to hear about your struggles with speeding up the calculations.

I can confirm that I can successfully run your code on my laptop and the total time is ~ 2mins for me as well.

I don't think this is necessarily a problem with your Ray installation. In your particular case, Ray is only being used for a very small part of the total operations -- specifically, the reaction enumeration which is carried out when you call rn.build(). On my laptop, this step takes about 3 seconds, so not much to parallelize here. Ray becomes more helpful for me when I am doing very large systems (i.e., reaction enumeration in a system with 20-30 elements). Hence I am not surprised that you are only seeing a small speedup, as well as diminishing return as you add CPUs. (2:28 --> 2:07 --> 2:10 for 4, 8, 48 CPUs respectively)

There is one other place where parallelization has been added at the moment. That is at end when you call ps.solve(). This function uses numba to process chunks of arrays in parallel using their JIT compilation. I am surprised, however, that you don't see faster performance with more CPUs here.

I will look into whether there is an issue with speedup from implementing numba here and get back to you. Hopefully this answer helps in the meantime. Good luck with your calculations!

Thanks for the quick response and your insights in this regard. I kind of mixed up these two together in my first post but now it definitely makes sense to me that it is the ps.solve() limits the parallel performance. So on my side I did run a couple of more jobs and received the following results:

$ tail -n 2 run_*.log
==> run_48cpu.log <==
Build reaction network:11.280963897705078
Solve path:499.15575528144836

==> run_4cpu.log <==
Build reaction network:23.104422330856323
Solve path:533.4506351947784

==> run_8cpu.log <==
Build reaction network:14.6209077835083
Solve path:490.0730323791504

As you can see the amount of time spent on building the network actually halved with more CPUs involved but not much improvement in the path solving part.

Regards, Jiaxin

FYI -- I am going to release a new version of the package today. I am not sure if it will necessarily change anything, but I did add a mode for turning on/off parallelization on the enumerator. You just have to initialize as BasicEnumerator(parallel=False)

That's fantastic! Can't wait to give it a try. Just out of curiosity — since numba is already used to manage the parallelism in the code, any chance your team might want to implement the GPU support in the future?:laughing:

As you can see the amount of time spent on building the network actually halved with more CPUs involved but not much improvement in the path solving part.

This is definitely an issue I was not aware of -- I am not sure why there would only be a minor speedup with more CPUs for the path solving. Perhaps this is an issue with the part of the code that does not use numba. I'll have to do some profiling to check. Thanks for making me aware of this!

any chance your team might want to implement the GPU support in the future?😆

I would love to include support for GPUs, but as I am a 1-man team on this project, it's not the biggest priority at the moment. However, with the GPU support we now have at NERSC that may change later this year...

Perhaps this is an issue with the part of the code that does not use numba. I'll have to do some profiling to check.

It turned out you are absolutely correct. From my profiling results, the functions that used up the most walltime when running with 48 cpus are:

ncalls tottime percall cumtime percall filename:lineno(function)
79647308 168.981 0 318.241 0 solver.py:201(_build_idx_vector)
49369200/16395048 99.28 0 212.251 0 {built-in method numpy.core._multiarray_umath.implement_array_function}
79647308 66.501 0 89.543 0 solver.py:216(<listcomp>)
16173661 47.781 0 61.573 0 shape_base.py:81(atleast_2d)
80353952 42.491 0 42.491 0 {built-in method numpy.zeros}
350066905/350046085 27.329 0 29.844 0 {method 'get' of 'dict' objects
16173661 25.202 0 343.443 0 solver.py:149(<listcomp>)
37 22.936 0.62 22.936 0.62 solver.py:286(balance_path_arrays)

Whereas the time spent on _build_idx_vector and balance_path_arrays are 183.778 and 64.31 seconds when using 2 cpus.

My understanding is the nested for loop to generate comp_matrices as the input for balance_path_arrays is the actual hotspot rather than execution of the function (which is the part parallelised by numba njit). Even though the amount of time to run balance_path_arrays decreased from 64s to 23s with the increasing CPU cores, the time to create this numpy matrix remains almost unchanged. However my attempt to merge the matrix construction into the balance_path_arrays was failed due to its dependency on the ComputedReaction class.

Therefore, do you have any insight if we could parallelise this matrix generation part as well?

Hi @jx-fan,

Thank you so much for doing this profiling! This helps a lot.

Based on your suggestion, I made a first attempt at parallelizing the creation of the comp_matrices. Like in other parts of the package, I decided to use ray again as it helps with sharing big read-only objects across the workers. Pathway solving seems significantly faster now. I don't have an exact estimate, but it seems to about 4-5x faster on my laptop. The only issue I see at the moment is some spilling of the plasma memory Ray uses (this may not be a huge issue, but kind of expected given the large data sizes).

Would you be interested in testing out the new code? All of the changes are in the dev branch of this repository. I also sped up the reaction enumerators -- so hopefully the entire process is much faster for you now :)

Hi @mattmcdermott,

I have tried to run the new code on our cluster in the last two weeks and did find out some interesting points:

  1. comp_matrices now can utilise the CPU resources across all nodes and can be calculated much more efficiently (thank you for your excellent work!).
  2. The way to start a ray cluster varies across different schedulers and computing node architectures. I feel the current initialize_ray() is specifically tailored for the slurm submission script (https://github.com/NERSC/slurm-ray-cluster/blob/master/submit-ray-cluster.sbatch) which sets ip_head and redis_password as environment variables. If the two variables can't be found, ray is initialised without address="auto", which would unfortuately fail to connect the worker nodes to the head node. 471c0a3
  3. In PathwaySolver.solve(), num_cpus is defined as num_cpus= ray.nodes()[0]["Resources"]["CPU"]. However from my test, it only counts for the number of cpus on a single node. In comparison, num_cpus = ray.cluster_resources()["CPU"] can return the total number of cpus in a multi-node calculation and therefore invokes more ray remote processes. 471c0a3
  4. From my most recent profiling data, due to the restriction of numba.njit that can only run on the head node, balance_path_arrays seems to become the new bottleneck that slows down the overall runtime quite significantly, especially with large systems that contain ~100 rxns and 5 rxn combos. I also encountered some memory issue with processing this big numpy array that would easily exceed the limit on the head node (190G, 48 CPUs in my case).
  5. Lastly, I also discovered the generation of num_rxns and batch_size is sometimes inconsistent when I rerun the same job. It is because the same reaction might be represented differently each time (e.g. hydrogen combustion rxn can be created as either 0.5 O2 + H2 -> H2O or 2 H2 + O2 -> 2 H2O). Therefore the additional rxns appended from intermediate_rxns can be different each time. I added an option to sort the reactants and products and it seems to help solve this problem. ec97ffe

Hope this feedback could provide you with some ideas about how to improve the code in the next step.:smile:

Hi @jx-fan,

Thank you once again for all this great feedback; really nice to have this demonstration of your use case and I appreciate you providing some commits for your change suggestions.

Unfortunately I am about to leave today for a backpacking trip with limited internet access, so I won't be able to address any of these items until I get back to the office mid-August (starting week of Aug 14). I'll make a note to re-visit then. Thank you again for your feedback on the code and hope things are going well!

Hi @jx-fan,

I think I've addressed most of these issues (except for number 4, which will take a little bit more optimization... need to look more into using numba and ray together). The other fixes should be already implemented in the dev branch:
https://github.com/GENESIS-EFRC/reaction-network/tree/dev

I will make a PR once I have had the chance to address some other things. Thank you again for your help catching these issues and sharing them here :)

Note that for number 5, I have actually implemented duplicate reaction checking in the ReactionSet class. https://github.com/GENESIS-EFRC/reaction-network/blob/2d34c9311b1495c962398aeb3dd65ba07a48c507/src/rxn_network/reactions/reaction_set.py#L228

This works by seeing if one reaction has coefficients that are a multiple of the other reaction's coefficients. I previously had this implemented in the __eq__ method for BasicReaction, but I decided to move this out of the class to help with some speed issues, especially for set creation.

Closing this issue with previously addressed fixes. Will continue to work on optimizing ray parallelization.