microsoft/PlanetaryComputerExamples

unable to initialize dask_cuda LocalCUDACluster

dylanrstewart opened this issue · 2 comments

Using GPU-PyTorch profile, unable to initialize LocalCUDACluster.

It does not fail on import:
from dask.distributed import Client
from dask_cuda import LocalCUDACluster

It fails on initializing the Cluster:
cluster = LocalCUDACluster(threads_per_worker=4)

Full error message:

distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/PlanetaryComputerExamples/tutorials/dask-worker-space/worker-o6h81bfq', purging
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 889, in run
    await worker
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1502, in start
    await self._register_with_scheduler()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1198, in _register_with_scheduler
    types={k: typename(v) for k, v in self.data.items()},
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1198, in <dictcomp>
    types={k: typename(v) for k, v in self.data.items()},
  File "/srv/conda/envs/notebook/lib/python3.8/_collections_abc.py", line 743, in __iter__
    for key in self._mapping:
RuntimeError: Set changed size during iteration
distributed.nanny - ERROR - Failed while trying to start worker process: Set changed size during iteration
Task exception was never retrieved
future: <Task finished name='Task-22' coro=<_wrap_awaitable() done, defined at /srv/conda/envs/notebook/lib/python3.8/asyncio/tasks.py:688> exception=RuntimeError('Set changed size during iteration')>
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 338, in start
    response = await self.instantiate()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 421, in instantiate
    result = await self.process.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 698, in start
    msg = await self._wait_until_connected(uid)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 817, in _wait_until_connected
    raise msg["exception"]
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 889, in run
    await worker
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1502, in start
    await self._register_with_scheduler()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1198, in _register_with_scheduler
    types={k: typename(v) for k, v in self.data.items()},
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1198, in <dictcomp>
    types={k: typename(v) for k, v in self.data.items()},
  File "/srv/conda/envs/notebook/lib/python3.8/_collections_abc.py", line 743, in __iter__
    for key in self._mapping:
RuntimeError: Set changed size during iteration
distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/PlanetaryComputerExamples/tutorials/dask-worker-space/worker-fz13r6ye', purging
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 889, in run
    await worker
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1502, in start
    await self._register_with_scheduler()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1198, in _register_with_scheduler
    types={k: typename(v) for k, v in self.data.items()},
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1198, in <dictcomp>
    types={k: typename(v) for k, v in self.data.items()},
  File "/srv/conda/envs/notebook/lib/python3.8/_collections_abc.py", line 743, in __iter__
    for key in self._mapping:
RuntimeError: Set changed size during iteration
distributed.nanny - ERROR - Failed while trying to start worker process: Set changed size during iteration
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7f338725d970>>, <Task finished name='Task-21' coro=<SpecCluster._correct_state_internal() done, defined at /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/deploy/spec.py:325> exception=RuntimeError('Set changed size during iteration')>)
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/deploy/spec.py", line 363, in _correct_state_internal
    await w  # for tornado gen.coroutine support
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 338, in start
    response = await self.instantiate()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 421, in instantiate
    result = await self.process.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 698, in start
    msg = await self._wait_until_connected(uid)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 817, in _wait_until_connected
    raise msg["exception"]
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 889, in run
    await worker
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1502, in start
    await self._register_with_scheduler()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1198, in _register_with_scheduler
    types={k: typename(v) for k, v in self.data.items()},
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1198, in <dictcomp>
    types={k: typename(v) for k, v in self.data.items()},
  File "/srv/conda/envs/notebook/lib/python3.8/_collections_abc.py", line 743, in __iter__
    for key in self._mapping:
RuntimeError: Set changed size during iteration
distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/PlanetaryComputerExamples/tutorials/dask-worker-space/worker-zyrogkjh', purging
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 889, in run
    await worker
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1502, in start
    await self._register_with_scheduler()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1198, in _register_with_scheduler
    types={k: typename(v) for k, v in self.data.items()},
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1198, in <dictcomp>
    types={k: typename(v) for k, v in self.data.items()},
  File "/srv/conda/envs/notebook/lib/python3.8/_collections_abc.py", line 743, in __iter__
    for key in self._mapping:
RuntimeError: Set changed size during iteration
distributed.nanny - ERROR - Failed while trying to start worker process: Set changed size during iteration
Task exception was never retrieved
future: <Task finished name='Task-47' coro=<_wrap_awaitable() done, defined at /srv/conda/envs/notebook/lib/python3.8/asyncio/tasks.py:688> exception=RuntimeError('Set changed size during iteration')>
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 338, in start
    response = await self.instantiate()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 421, in instantiate
    result = await self.process.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 698, in start
    msg = await self._wait_until_connected(uid)
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 817, in _wait_until_connected
    raise msg["exception"]
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 889, in run
    await worker
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1502, in start
    await self._register_with_scheduler()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1198, in _register_with_scheduler
    types={k: typename(v) for k, v in self.data.items()},
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1198, in <dictcomp>
    types={k: typename(v) for k, v in self.data.items()},
  File "/srv/conda/envs/notebook/lib/python3.8/_collections_abc.py", line 743, in __iter__
    for key in self._mapping:
RuntimeError: Set changed size during iteration
distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/PlanetaryComputerExamples/tutorials/dask-worker-space/worker-jpr2r_q4', purging
distributed.preloading - INFO - Import preload module: dask_cuda.initialize
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py", line 889, in run
    await worker
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1502, in start
    await self._register_with_scheduler()
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1198, in _register_with_scheduler
    types={k: typename(v) for k, v in self.data.items()},
  File "/srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py", line 1198, in <dictcomp>
    types={k: typename(v) for k, v in self.data.items()},
  File "/srv/conda/envs/notebook/lib/python3.8/_collections_abc.py", line 743, in __iter__
    for key in self._mapping:
RuntimeError: Set changed size during iteration
distributed.nanny - ERROR - Failed while trying to start worker process: Set changed size during iteration
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
Input In [1], in <module>
      1 from dask.distributed import Client
      2 from dask_cuda import LocalCUDACluster
----> 4 cluster = LocalCUDACluster(threads_per_worker=4)
      5 client = Client(cluster)
      6 print(f"/proxy/{client.scheduler_info()['services']['dashboard']}/status")

File /srv/conda/envs/notebook/lib/python3.8/site-packages/dask_cuda/local_cuda_cluster.py:361, in LocalCUDACluster.__init__(self, CUDA_VISIBLE_DEVICES, n_workers, threads_per_worker, memory_limit, device_memory_limit, data, local_directory, protocol, enable_tcp_over_ucx, enable_infiniband, enable_nvlink, enable_rdmacm, ucx_net_devices, rmm_pool_size, rmm_managed_memory, rmm_async, rmm_log_directory, jit_unspill, log_spilling, worker_class, **kwargs)
    359 self.cuda_visible_devices = CUDA_VISIBLE_DEVICES
    360 self.scale(n_workers)
--> 361 self.sync(self._correct_state)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/deploy/cluster.py:258, in Cluster.sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    256     return future
    257 else:
--> 258     return sync(self.loop, func, *args, **kwargs)

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py:332, in sync(loop, func, callback_timeout, *args, **kwargs)
    330 if error[0]:
    331     typ, exc, tb = error[0]
--> 332     raise exc.with_traceback(tb)
    333 else:
    334     return result[0]

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/utils.py:315, in sync.<locals>.f()
    313     if callback_timeout is not None:
    314         future = asyncio.wait_for(future, callback_timeout)
--> 315     result[0] = yield future
    316 except Exception:
    317     error[0] = sys.exc_info()

File /srv/conda/envs/notebook/lib/python3.8/site-packages/tornado/gen.py:762, in Runner.run(self)
    759 exc_info = None
    761 try:
--> 762     value = future.result()
    763 except Exception:
    764     exc_info = sys.exc_info()

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/deploy/spec.py:363, in SpecCluster._correct_state_internal(self)
    361     for w in workers:
    362         w._cluster = weakref.ref(self)
--> 363         await w  # for tornado gen.coroutine support
    364 self.workers.update(dict(zip(to_open, workers)))

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py:283, in Server.__await__.<locals>._()
    277             raise TimeoutError(
    278                 "{} failed to start in {} seconds".format(
    279                     type(self).__name__, timeout
    280                 )
    281             )
    282     else:
--> 283         await self.start()
    284         self.status = Status.running
    285 return self

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py:338, in Nanny.start(self)
    335     await self.plugin_add(plugin=plugin, name=name)
    337 logger.info("        Start Nanny at: %r", self.address)
--> 338 response = await self.instantiate()
    339 if response == Status.running:
    340     assert self.worker_address

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py:421, in Nanny.instantiate(self, comm)
    419 else:
    420     try:
--> 421         result = await self.process.start()
    422     except Exception:
    423         await self.close()

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py:698, in WorkerProcess.start(self)
    696     return self.status
    697 try:
--> 698     msg = await self._wait_until_connected(uid)
    699 except Exception:
    700     self.status = Status.failed

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py:817, in WorkerProcess._wait_until_connected(self, uid)
    813 if "exception" in msg:
    814     logger.error(
    815         "Failed while trying to start worker process: %s", msg["exception"]
    816     )
--> 817     raise msg["exception"]
    818 else:
    819     return msg

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/nanny.py:889, in run()
    885 """
    886 Try to start worker and inform parent of outcome.
    887 """
    888 try:
--> 889     await worker
    890 except Exception as e:
    891     logger.exception("Failed to start worker")

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/core.py:283, in _()
    277             raise TimeoutError(
    278                 "{} failed to start in {} seconds".format(
    279                     type(self).__name__, timeout
    280                 )
    281             )
    282     else:
--> 283         await self.start()
    284         self.status = Status.running
    285 return self

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py:1502, in start()
   1497 await asyncio.gather(
   1498     *(self.plugin_add(plugin=plugin) for plugin in self._pending_plugins)
   1499 )
   1500 self._pending_plugins = ()
-> 1502 await self._register_with_scheduler()
   1504 self.start_periodic_callbacks()
   1505 return self

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py:1198, in _register_with_scheduler()
   1179 comm.name = "Worker->Scheduler"
   1180 comm._server = weakref.ref(self)
   1181 await comm.write(
   1182     dict(
   1183         op="register-worker",
   1184         reply=False,
   1185         address=self.contact_address,
   1186         status=self.status.name,
   1187         keys=list(self.data),
   1188         nthreads=self.nthreads,
   1189         name=self.name,
   1190         nbytes={
   1191             ts.key: ts.get_nbytes()
   1192             for ts in self.tasks.values()
   1193             # Only if the task is in memory this is a sensible
   1194             # result since otherwise it simply submits the
   1195             # default value
   1196             if ts.state == "memory"
   1197         },
-> 1198         types={k: typename(v) for k, v in self.data.items()},
   1199         now=time(),
   1200         resources=self.total_resources,
   1201         memory_limit=self.memory_limit,
   1202         local_directory=self.local_directory,
   1203         services=self.service_ports,
   1204         nanny=self.nanny,
   1205         pid=os.getpid(),
   1206         versions=get_versions(),
   1207         metrics=await self.get_metrics(),
   1208         extra=await self.get_startup_information(),
   1209     ),
   1210     serializers=["msgpack"],
   1211 )
   1212 future = comm.read(deserializers=["msgpack"])
   1214 response = await future

File /srv/conda/envs/notebook/lib/python3.8/site-packages/distributed/worker.py:1198, in <dictcomp>()
   1179 comm.name = "Worker->Scheduler"
   1180 comm._server = weakref.ref(self)
   1181 await comm.write(
   1182     dict(
   1183         op="register-worker",
   1184         reply=False,
   1185         address=self.contact_address,
   1186         status=self.status.name,
   1187         keys=list(self.data),
   1188         nthreads=self.nthreads,
   1189         name=self.name,
   1190         nbytes={
   1191             ts.key: ts.get_nbytes()
   1192             for ts in self.tasks.values()
   1193             # Only if the task is in memory this is a sensible
   1194             # result since otherwise it simply submits the
   1195             # default value
   1196             if ts.state == "memory"
   1197         },
-> 1198         types={k: typename(v) for k, v in self.data.items()},
   1199         now=time(),
   1200         resources=self.total_resources,
   1201         memory_limit=self.memory_limit,
   1202         local_directory=self.local_directory,
   1203         services=self.service_ports,
   1204         nanny=self.nanny,
   1205         pid=os.getpid(),
   1206         versions=get_versions(),
   1207         metrics=await self.get_metrics(),
   1208         extra=await self.get_startup_information(),
   1209     ),
   1210     serializers=["msgpack"],
   1211 )
   1212 future = comm.read(deserializers=["msgpack"])
   1214 response = await future

File /srv/conda/envs/notebook/lib/python3.8/_collections_abc.py:743, in __iter__()
    742 def __iter__(self):
--> 743     for key in self._mapping:
    744         yield (key, self._mapping[key])

RuntimeError: Set changed size during iteration

Hmm strange. I am bumping the container images in version in microsoft/planetary-computer-hub#63. Once I've validated that things are OK in staging then I'll update prod as well.

This should be fixed by the updated images released to the hub today. LMK if you're still seeing it.