BUG - dask integration example not working
cherusk opened this issue · 6 comments
Expected behavior
That the code sample executes on the pre-provisioned dask cluster and that the optuna to dask distributed integration is working seamlessly.
Environment
- Optuna version:
- Python version: python3.9
- OS: Debian GNU/Linux 11 (airflow packaged container)
- (Optional) Other libraries and their versions:
optuna == 3.1.0b0
joblib == 1.2.0
dask == 2022.12.1
distributed == 2022.12.1
Error messages, stack traces, or logs
[2023-01-03, 00:00:14 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py", line 188, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 193, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/root_dag.py", line 93, in run_optimization
storage = optuna.integration.DaskStorage(InMemoryStorage())
File "/home/airflow/.local/lib/python3.9/site-packages/optuna/_experimental.py", line 115, in wrapped_init
_original_init(self, *args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/optuna/integration/dask.py", line 446, in init
self.client.run_on_scheduler(_register_with_scheduler, storage=storage, name=self.name)
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/client.py", line 2740, in run_on_scheduler
return self.sync(self._run_on_scheduler, function, *args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 339, in sync
return sync(
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 406, in sync
raise exc.with_traceback(tb)
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 379, in f
result = yield future
File "/home/airflow/.local/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/client.py", line 2691, in _run_on_scheduler
response = await self.scheduler.run_function(
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 1155, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 945, in send_recv
raise exc.with_traceback(tb)
File "/opt/conda/lib/python3.9/site-packages/distributed/core.py", line 820, in _handle_comm
File "/opt/conda/lib/python3.9/site-packages/distributed/worker.py", line 3217, in run
File "/opt/conda/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 73, in loads
ModuleNotFoundError: No module named 'optuna'
# error messages, stack traces, or logs
Steps to reproduce
- take code used and rerun
Reproducible examples (optional)
From https://github.com/cherusk/godon/blob/master/breeder/linux_network_stack/root_dag.py#L83
# Essentially the example code
import optuna
from optuna.storages import InMemoryStorage
from optuna.integration import DaskStorage
from dask.distributed import Client
from dask.distributed import wait
def run_optimization():
# boilerplate from https://jrbourbeau.github.io/dask-optuna/
def objective(trial):
x = trial.suggest_uniform("x", -10, 10)
return (x - 2) ** 2
with Client(address="godon_dask_scheduler_1:8786") as client:
# Create a study using Dask-compatible storage
storage = optuna.integration.DaskStorage(InMemoryStorage())
study = optuna.create_study(storage=storage)
# Optimize in parallel on your Dask cluster
futures = [
client.submit(study.optimize, objective, n_trials=10, pure=False)
for i in range(10)
]
wait(futures)
print(f"Best params: {study.best_params}")
optimization_step = run_optimization()
Additional context (optional)
The end of the trace is quite strange, because nothing was installed via conda. All comes in via pip:
https://github.com/cherusk/godon/blob/master/Dockerfile-airflow#L3
optuna meant to run as backend of godon project: https://github.com/cherusk/godon
Maybe @jrbourbeau, because he authored the example code!?
Could you try import optuna
on your environment to make sure your env is correctly setup?
@nzw0301 yes, you can see the import also in the code snippet I provided.
But I ran it manually now also in the airflow container:
airflow@56d394fe9f60:/opt/airflow$ python3
Python 3.9.15 (main, Oct 25 2022, 05:49:37)
[GCC 10.2.1 20210110] on linux
>>> import optuna
>>>
It succeeded as you can see. That the error stems from the dask workers I doubt, because it fails already at the optuna.integration.DaskStorage step.
The example does also does not state the version requirements at the point when it worked. 😢
I've found the issue.
The installs on the scheduler and the workers was done with conda, that's where this was coming from.
File "/opt/conda/lib/python3.9/site-packages/distributed/core.py", line 820, in _handle_comm
File "/opt/conda/lib/python3.9/site-packages/distributed/worker.py", line 3217, in run
File "/opt/conda/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 73, in loads
Installing the pre-release version of optuna on the target dask cluster solved the issue. 😳