agronholm/apscheduler

Worker stops if there are "too much" simultaneous jobs

mavahedinia opened this issue · 2 comments

Things to check first

  • I have checked that my issue does not already have a solution in the FAQ

  • I have searched the existing issues and didn't find my bug already reported there

  • I have checked that my bug is still present in the latest release

Version

4.0.0a4

What happened?

For my use case, I needed to have separate scheduler and worker instances (since there might be thousands of jobs to be executed in a short time and I need the system to be scalable). Everything was fine and stable until our team decided to take a stress test to see when the system breaks. Here we understood that if we throttle the number of jobs submitted to the scheduler, everything would be fine. However, if we submit like 1000 jobs at the same time to the scheduler, workers are overwhelmed and a deadlock situation is caused in the workers job processing loop; causing the worker to stop acquiring and executing jobs at all without any crashing.

I inspected the code and pinpointed the issue. It is happening inside the _process_jobs function under the scheduler (async) class (line 905 as I am reporting the issue - commit hash is f375b67). On line 938 inside the loop the worker awaits wakeup event, which itself is controlled by the job_added function defined there. this function is called only when a new job is added:

async def job_added(event: Event) -> None:
    if len(self._running_jobs) < self.max_concurrent_jobs:
        wakeup_event.set()
...
self.event_broker.subscribe(job_added, {JobAdded})

This combined with the max_concurrent_jobs constraint controlled on line 927 implies that if there are more than max_concurrent_jobs jobs in the db, the worker acquires and tries to execute them, i.e., appends them to the queue; but if there are no newer jobs scheduled after them, the wakeup_event is not set, resulting in a deadlock situation prohibiting the loop from acquiring more jobs, even if the queue is empty.

To fix that, I propose to change the structure only a little bit:

  1. changing the job_added name to check_queue_capacity
  2. Subscribe that function to both JobAdded and JobReleased events.

This way, we can ensure that if there are more jobs that the worker can handle at once, the worker gets notified after the queue is freed up.

How can we reproduce the bug?

Simply schedule around 1000 jobs, run couple of workers (less than 4) and see that some of the jobs won't be executed after a while, unless you restart the workers manually.

The proposed solution is available at PR#882

Fixed via 0596db7.