tobymao/saq

Stopping Worker Cancels Running Tasks

jamesoguyer opened this issue · 4 comments

Can stopping the worker get an option wait for all tasks to complete rather than cancel them? Would be useful to have a way to block new jobs from being processed and wait for the existing jobs to finish.

# change - allow for option to skip the task cancel() and set flag to block processing new tasks
async def stop(self) -> None:
    """Stop the worker and cleanup."""
    self.event.set()
    all_tasks = list(self.tasks)
    self.tasks.clear()
    for task in all_tasks:
        task.cancel()
    await asyncio.gather(*all_tasks, return_exceptions=True)
    
# change - check for job processing flag from worker stop and don't schedule new tasks
def _process(self, previous_task: Task | None = None) -> None:
  if previous_task:
      self.tasks.discard(previous_task)

  if not self.event.is_set():
      new_task = asyncio.create_task(self.process())
      self.tasks.add(new_task)
      new_task.add_done_callback(self._process)

I have no plans to implement this but happy to accept a well written + tested PR. Happy to help you through it, let me know if you're interested.

Would like to create a PR to help support this. Ran into an issue where updates to machines would take down the workers and cause tasks to be aborted while in progress. Would like to take fleet of machines processing jobs individually draining jobs from them first before taking them offline.

Yea, it's designed to recover, so a shut down worker will cancel jobs immediately, and they'll picked up by other workers. This was the pattern we used.

hit me up on slack

The issue with this approach from what I remember is that when a machine is taken off line, it's immediate, there's no buffer time -- you don't really control that on something like kubernetes, so you want to cancel the jobs asap. If you have some kind of wait time, it's possible the machine just shuts down and you don't get to clean up tasks.

No change needed, will design all tasks that use the worker library to be atomic so they can re-run safely if they are aborted halfway through.