optuna/optuna-examples

BUG - dask integration example not working

cherusk opened this issue · 6 comments

Expected behavior

That the code sample executes on the pre-provisioned dask cluster and that the optuna to dask distributed integration is working seamlessly.

Environment

  • Optuna version:
  • Python version: python3.9
  • OS: Debian GNU/Linux 11 (airflow packaged container)
  • (Optional) Other libraries and their versions:
    optuna == 3.1.0b0
    joblib == 1.2.0
    dask == 2022.12.1
    distributed == 2022.12.1

Error messages, stack traces, or logs

[2023-01-03, 00:00:14 UTC] {taskinstance.py:1851} ERROR - Task failed with exception
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/decorators/base.py", line 188, in execute
return_value = super().execute(context)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/operators/python.py", line 193, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/airflow/dags/root_dag.py", line 93, in run_optimization
storage = optuna.integration.DaskStorage(InMemoryStorage())
File "/home/airflow/.local/lib/python3.9/site-packages/optuna/_experimental.py", line 115, in wrapped_init
_original_init(self, *args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/optuna/integration/dask.py", line 446, in init
self.client.run_on_scheduler(_register_with_scheduler, storage=storage, name=self.name)
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/client.py", line 2740, in run_on_scheduler
return self.sync(self._run_on_scheduler, function, *args, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 339, in sync
return sync(
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 406, in sync
raise exc.with_traceback(tb)
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/utils.py", line 379, in f
result = yield future
File "/home/airflow/.local/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/client.py", line 2691, in _run_on_scheduler
response = await self.scheduler.run_function(
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 1155, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File "/home/airflow/.local/lib/python3.9/site-packages/distributed/core.py", line 945, in send_recv
raise exc.with_traceback(tb)
File "/opt/conda/lib/python3.9/site-packages/distributed/core.py", line 820, in _handle_comm
File "/opt/conda/lib/python3.9/site-packages/distributed/worker.py", line 3217, in run
File "/opt/conda/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 73, in loads
ModuleNotFoundError: No module named 'optuna'

# error messages, stack traces, or logs

Steps to reproduce

  1. take code used and rerun

Reproducible examples (optional)

From https://github.com/cherusk/godon/blob/master/breeder/linux_network_stack/root_dag.py#L83

# Essentially the example code

import optuna
from optuna.storages import InMemoryStorage
from optuna.integration import DaskStorage
from dask.distributed import Client
from dask.distributed import wait


def run_optimization():
            # boilerplate from https://jrbourbeau.github.io/dask-optuna/

            def objective(trial):
                x = trial.suggest_uniform("x", -10, 10)
                return (x - 2) ** 2

            with Client(address="godon_dask_scheduler_1:8786") as client:
                # Create a study using Dask-compatible storage
                storage = optuna.integration.DaskStorage(InMemoryStorage())
                study = optuna.create_study(storage=storage)
                # Optimize in parallel on your Dask cluster
                futures = [
                    client.submit(study.optimize, objective, n_trials=10, pure=False)
                    for i in range(10)
                ]
                wait(futures)
                print(f"Best params: {study.best_params}")


optimization_step = run_optimization()

Additional context (optional)

The end of the trace is quite strange, because nothing was installed via conda. All comes in via pip:
https://github.com/cherusk/godon/blob/master/Dockerfile-airflow#L3

optuna meant to run as backend of godon project: https://github.com/cherusk/godon

Maybe @jrbourbeau, because he authored the example code!?

Could you try import optuna on your environment to make sure your env is correctly setup?

@nzw0301 yes, you can see the import also in the code snippet I provided.

But I ran it manually now also in the airflow container:

airflow@56d394fe9f60:/opt/airflow$ python3
Python 3.9.15 (main, Oct 25 2022, 05:49:37)
[GCC 10.2.1 20210110] on linux
>>> import optuna
>>>

It succeeded as you can see. That the error stems from the dask workers I doubt, because it fails already at the optuna.integration.DaskStorage step.

The example does also does not state the version requirements at the point when it worked. 😢

I've found the issue.

The installs on the scheduler and the workers was done with conda, that's where this was coming from.

File "/opt/conda/lib/python3.9/site-packages/distributed/core.py", line 820, in _handle_comm
File "/opt/conda/lib/python3.9/site-packages/distributed/worker.py", line 3217, in run
File "/opt/conda/lib/python3.9/site-packages/distributed/protocol/pickle.py", line 73, in loads

Installing the pre-release version of optuna on the target dask cluster solved the issue. 😳