`Scheduler was unaware of this worker` message during worker shutdown
Opened this issue · 0 comments
@bnaul reported seeing frequent messages like Scheduler was unaware of this worker 'tcp://10.124.34.24:45585'. Shutting down
on an adaptive cluster with 2k workers. I think this message is likely to show up somewhat often during normal worker shutdown, due to the inconsistency in how the scheduler defines closed workers: #6390. It's not actually a sign of anything being wrong in this case, but is noisy and misleading.
Here's a possible flow for a worker being closed:
- Scheduler calls
remove_worker
on the worker and instantly removes its state tracking that worker’s existence. It also queues an{"op": "close"}
message to send to that worker (but does not send it yet). - Before that
{"op": "close"}
message has reached the worker, the worker sends another heartbeat (or potentially, a heartbeat was even already on the wire before the scheduler ranremove_worker
) - Scheduler receives the heartbeat
- Scheduler thinks the worker doesn’t exist, causing this response
- The worker is already shutting down, so the
{"status": "missing"}
response doesn’t actually change anything worker-side (it still shuts down as usual)
Eliminating this message on the worker side would be easy:
diff --git a/distributed/worker.py b/distributed/worker.py
index 86e1fba5..1173abc3 100644
--- a/distributed/worker.py
+++ b/distributed/worker.py
@@ -1177,12 +1177,13 @@ class Worker(BaseWorker, ServerNode):
self._update_latency(end - start)
if response["status"] == "missing":
- # Scheduler thought we left. Reconnection is not supported, so just shut down.
- logger.error(
- f"Scheduler was unaware of this worker {self.address!r}. Shutting down."
- )
- # Something is out of sync; have the nanny restart us if possible.
- await self.close(nanny=False)
+ if self.status not in (Status.closing, Status.closed):
+ # Scheduler thought we left. Reconnection is not supported, so just shut down.
+ logger.error(
+ f"Scheduler was unaware of this worker {self.address!r}. Shutting down."
+ )
+ # Something is out of sync; have the nanny restart us if possible.
+ await self.close(nanny=False)
return
self.scheduler_delay = response["time"] - middle
On the scheduler side though, it's reflective of a larger problem around how we represent the state of a closed worker: #6390.
The problem is that the scheduler puts things in a "closed" state (by deleting self.workers[address]
) while the worker may still be alive and connected. In the case of a clean shutdown, it would probably make sense if worker closure followed a request-response:
- Scheduler asks worker to close. It keeps
workers[address]
around, but theWorkerState.status
is set toclosing
, and any tasks it's running or storing are transitioned off immediately (like currently happens). - Worker responds with a confirmation of the closure, which would be guaranteed to be its last message to the scheduler.
- Scheduler removes
workers[address]
.
This confirmation wouldn't have to be a message or RPC, per se—it could simply take the form of the worker closing the batched stream to the scheduler. The point is though, there needs to be some way of representing gray area where we've asked a worker to close, but haven't confirmed that it's gone yet.