grid2op.make(...) not threadsafe?
Opened this issue · 6 comments
Environment
- Grid2op version:
1.10.2
- System:
windows 11
- Python:
3.11.9
Bug description
When using Python's Multiprocessing pool or concurrent ThreadPoolExecutor initializing more than 1 environment in parallel (asynchronously) can throw errors on some of the threads because the attributes of the backend (or other gridobjects) are not properly defined. I suspect this is because they are trying to initialize off the same resources on the disk (i.e. the .json and chronics files) which are locked by the first thread that grabs them.
How to reproduce
import grid2op
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor
def make_env():
env = grid2op.make("rte_case14_realistic")
obs = env.reset()
print(obs)
return obs
futures = set()
with ThreadPoolExecutor(max_workers=mp.cpu_count()-1) as executor:
for i in range(mp.cpu_count() - 1):
futures.add(executor.submit(make_env))
for item in futures:
print(item)
Will usually throw a Backend error or TypeError on a subset of the function calls. These errors correspond to attributes of the Backend being None or -1 (i.e. Backend was not successfully initialized).
Current output
Some of the threads return errors:
<Future at 0x1db68531fd0 state=finished returned CompleteObservation_unknown>
<Future at 0x1db710ad850 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66b26650 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d48a90 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66e2f690 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66c9e6d0 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d48d10 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66e2d110 state=finished raised TypeError>
<Future at 0x1db66e2c3d0 state=finished raised ValueError>
<Future at 0x1db6b5097d0 state=finished raised ValueError>
<Future at 0x1db6accfd50 state=finished returned CompleteObservation_unknown>
<Future at 0x1db6ac94550 state=finished returned CompleteObservation_rte_case14_realistic>
<Future at 0x1db6d581b50 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d48d90 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d4b9d0 state=finished returned CompleteObservation_unknown>
Expected output
All threads successfully return the initial observation:
<Future at 0x1db68531fd0 state=finished returned CompleteObservation_unknown>
<Future at 0x1db710ad850 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66b26650 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d48a90 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66e2f690 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66c9e6d0 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d48d10 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66e2d110 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66e2c3d0 state=finished returned CompleteObservation_unknown>
<Future at 0x1db6b5097d0 state=finished returned CompleteObservation_unknown>
<Future at 0x1db6accfd50 state=finished returned CompleteObservation_unknown>
<Future at 0x1db6ac94550 state=finished returned CompleteObservation_unknown>
<Future at 0x1db6d581b50 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d48d90 state=finished returned CompleteObservation_unknown>
<Future at 0x1db66d4b9d0 state=finished returned CompleteObservation_unknown>
Temporary Fix
Put grid2op.make(...) inside a for loop with try-except (not recommended!):
N_ATTEMPTS = 10
for _ in range(N_ATTEMPTS):
try:
env = grid2op.make(dataset=self.env_name, backend=env_params.backend(), **grid2op_params)
break
except: # Resource is busy, wait
time.sleep(1.0)
The runner in Grid2Op clearly does work and supports parallel processing so I looked inside of it, it does not use Grid2op.make but instead initializes the environment using init_obj_from_kwargs(...):
Line 653 in runner.py:
res = self.envClass.init_obj_from_kwargs(
n_busbar=self._n_busbar,
other_env_kwargs=self.other_env_kwargs,
init_env_path=self.init_env_path,
init_grid_path=self.init_grid_path,
chronics_handler=chronics_handler,
backend=backend,
parameters=parameters,
name=self.name_env,
names_chronics_to_backend=self.names_chronics_to_backend,
actionClass=self.actionClass,
observationClass=self.observationClass,
rewardClass=self.rewardClass,
legalActClass=self.legalActClass,
voltagecontrolerClass=self.voltageControlerClass,
other_rewards=self._other_rewards,
opponent_space_type=self._opponent_space_type,
opponent_action_class=self.opponent_action_class,
opponent_class=self.opponent_class,
opponent_init_budget=self.opponent_init_budget,
opponent_budget_per_ts=self.opponent_budget_per_ts,
opponent_budget_class=self.opponent_budget_class,
opponent_attack_duration=self.opponent_attack_duration,
opponent_attack_cooldown=self.opponent_attack_cooldown,
kwargs_opponent=self.opponent_kwargs,
with_forecast=self.with_forecast,
attention_budget_cls=self._attention_budget_cls,
kwargs_attention_budget=self._kwargs_attention_budget,
has_attention_budget=self._has_attention_budget,
logger=self.logger,
kwargs_observation=self._kwargs_observation,
observation_bk_class=self._observation_bk_class,
observation_bk_kwargs=self._observation_bk_kwargs,
_raw_backend_class=self.backendClass,
_read_from_local_dir=self._read_from_local_dir,
)
Hello,
First I think there are errors on the "expected output" that you wrote is not correct. You wrote:
<Future at 0x1db68531fd0 state=finished returned CompleteObservation_unknown>
<Future at 0x1db710ad850 state=finished returned CompleteObservation_unknown>
The "unknown" at the end is definitely not normal. It should have been the name of the environment. If it works it should display :
<Future at 0x1db68531fd0 state=finished returned CompleteObservation_rte_case14_realistic>
<Future at 0x1db710ad850 state=finished returned CompleteObservation_rte_case14_realistic>
Then, to adress more specifically the question, probably, if you do some things using thread, you should first init the grid (so that the classes are initialized, such that CompleteObservation_rte_case14_realistic
is initialized once when you create the initial environment). And then you multiprocessing some of the things.
But be aware that "multi threading" in python does not work really well because of the GIL
(global interpreter lock, feel free to google it I will not describe it here :-) ). But maybe the concurrent
library handles that for you. If I were you I would prefer to use the multiprocessing
library.
Basically, what is probably happening here is that grid2op tries to create the same class (for example Environment_rte_case14_realistic
multiple times and concurrently (at the same time) which breaks some of python internal representation (I am not sure we should do that) and it is probably why sometimes you get errors and sometimes not.
In the runner, I use multiprocess library to perform asynchronous evaluation. I also use multiprocess library in the BaseMultiProcessEnvironment
class. I never tested it with something else.
Finally, I use this way of doing things in the runner because you might have different environment class (eg MaskedEnvironment
or TimeOutEnvironment
) which cannot be created (yet) with a call to make
. For now you have to do things like
import grid2op
from grid2op.Environment import MaskedEnvironment
env_name = "l2rpn_case14_sandbox"
lines_of_interest = np.array([True, True, True, True, True, True,
False, False, False, False, False, False,
False, False, False, False, False, False,
False, False])
env = MaskedEnvironment(grid2op.make(env_name),
lines_of_interest=lines_of_interest)
This way the "child" class of Environment
(MaskedEnvironment
in this case) can benefit from the Runner transparently because the init_obj_from_kwargs
is overloaded properly :-)
Hope that helps
After looking at this, I managed to make it "work" with something like:
import grid2op
import multiprocessing as mp
from concurrent.futures import ThreadPoolExecutor
def make_init_env():
env = grid2op.make("l2rpn_case14_sandbox")
_ = env.reset()
return env
def make_env(env_init):
env = env_init.copy()
obs = env.reset()
print(obs)
return obs
if __name__ == "__main__":
env_init = make_init_env()
futures = set()
with ThreadPoolExecutor(max_workers=mp.cpu_count()-1) as executor:
for i in range(mp.cpu_count() - 1):
futures.add(executor.submit(make_env, env_init.copy()))
for item in futures:
print(item.result())
Thank you Benjamin, I did not realize threading in Python was fundamentally difficult to do because of the Global Interpreter Lock. I usually opt for the multiprocessing library as well (I think this issue is present there as well though, so your solution should also be needed there). However the concurrent library is what is used by Optuna - which is where the trouble began. Passing around an initialized environment is a bit tricky in my case since we have 3 environments (train, val, test) but I think we can make it work.
/ Xavier
If you don't do the env_init
it does not really work (on my linux machine)
If you don't copy the environment when calling make_env
:
futures.add(executor.submit(make_env, env_init)) # DON'T DO: DOES WORK
It does not work either.
I don't know if you can adapt your overall code to work like this.
Anyway, my recommendation would be :
- optimize everything that can be in single core (I don't really know what to do, but there are certainly a lot of things you can optimize in grid2op, see for example https://grid2op.readthedocs.io/en/latest/data_pipeline.html or lightsim2grid backend (lightsim2grid.readthedocs.io/)
- optimize everything outside of grid2op that can be, for example if you use RL make sure that the time you spent in your model is not higher than the time spent in grid2op (you can compare for that the time it takes to do say 1000 forward pass with grid2op on your model to the time it takes grid2op to do 1000 "random" actions). With some RL framework you can be surprised
- only then try to do "asynchronous" things (and prefer in this case multiprocessing and not multi threading)
Finally do not hesitate to have a look at https://grid2op.readthedocs.io/en/master/troubleshoot.html#experimental-read-from-local-dir. It might work in your case
While the minimal example with the ThreadpoolExecutor you shared does work on my Windows machine, unfortunately it does not seem I can adapt it to work in Optuna. I don't think there is a direct solution to be found since putting .copy() on attributes passed to the objective() function in Optuna still ends up running this locally in each thread (rather than outside in a for loop):
import optuna
import psutil
import joblib
def objective(trial, train_env, val_env, test_env):
train_env = train_env.copy()
val_env = val_env.copy()
test_env = test_env.copy()
return 0.0
...
safe_objective = lambda trial: objective(trial, train_env.copy(), val_env.copy(), test_env.copy())
n_cores = psutil.cpu_count(logical=False)
with joblib.parallel_config(backend="multiprocessing", n_jobs = n_cores):
study = optuna.create_study()
study.optimize(safe_objective, n_trials=100, n_jobs=n_cores)
The old hack does still work, but based on your comment I did some quick speed testing and it is actually not any faster to use the multiprocessing than run in one thread (since it takes about 1/10th the time per episode on 1 thread compared to 10 threads). This is in part because multiple threads cannot make use of the single GPU on my system. Given this, I may as well focus on optimizing the 1 thread (for example, with the MultiFolderWithCache from data_pipeline documentation you sent).
Nevertheless, in the case of hyperparameter optimization having parallel trials does still seem advantageous to me since then the hyperparameter space can be explored much more efficiently (assuming each trial is about as fast as a single trial by itself). May return to this again in the future.