ruby-concurrency/concurrent-ruby

Unexpected pruning behaviour with consecutive task batches

joshuay03 opened this issue · 0 comments

* Operating system:                macOS Sequoia 15.0.1
* Ruby implementation:             ruby 3.4.0preview1
* `concurrent-ruby` version:       1.3.3
* `concurrent-ruby-ext` installed: no
* `concurrent-ruby-edge` used:     no

Cross-post of rails/rails#53211 to open up a discussion. Specifically for the part in the script where a second consecutive 'batch' of work is assigned to a Concurrent::ThreadPoolExecutor pool and it is incorrectly pruned before the processing has begun:

begin
  require "concurrent-ruby"
  
  pool = Concurrent::ThreadPoolExecutor.new(
    min_threads: 1,
    max_threads: 4,
    max_queue: 0,
    idletime: 3
  )
  # First thread is lazily spawned.
  pp pool.length #=> 0
  
  work = -> { sleep 2 }
  
  # Batch (gap-less individual units) of work.
  10.times { pool << work }
  # Wait for state updates.
  sleep 0.25
  # Expected scale up.
  pp pool.length #=> 4
  # Wait for all work to be processed.
  # This is sufficient cause work is I/O bound and parallel.
  sleep 10
  pp pool.length #=> 4
  # Wait until idle time of all threads has elapsed.
  # This is sufficient; only needs to be greater than the idle time of the last busy thread.
  sleep 5
  # Not scaled down.
  # Prune will only take place when next unit of work is received, despite idle time elapse.
  pp pool.length #=> 4

  # Wait for a while to show no change.
  sleep 20
  pp pool.length #=> 4
  
  # Another batch of work.
  10.times { pool << work }
  # Wait for state updates.
  sleep 0.25
  # This case is the most interesting, and might need to be addressed in concurrent-ruby.
  # If bulk work comes in when scaled up, since prune is called right after assignment / queuing, 
  # there's a race condition between when the ready workers size is checked for prune, and the
  # threads start processing the work, which is when the ready size is updated. As a result, we end
  # up with a single thread handling all the work i.e. the pool is prematurely scaled down, and stays
  # that way since all units of work have been assigned / queued.
  pp pool.length #=> 1
  # Wait for all work to be processed.
  # Work is now sequential.
  sleep 25
  pp pool.length #=> 1
  # Wait until idle time of all threads has elapsed.
  sleep 5
  pp pool.length #=> 1

  # Wait for a while to show no change.
  sleep 20
  pp pool.length #=> 1
  
  # Individual units of work, spaced apart.
  # No work will be completed by the time the last unit is added.
  1.times { pool << work }
  sleep 0.25
  1.times { pool << work }
  sleep 0.25
  1.times { pool << work }
  sleep 0.25
  1.times { pool << work }
  # Wait for state updates.
  sleep 0.25
  # Expected scale up.
  pp pool.length #=> 4
  # Wait for all work to be processed.
  sleep 10
  pp pool.length #=> 4
  # Wait until idle time of all threads has elapsed.
  sleep 5
  # Once again, won't scale down till the next unit.
  pp pool.length #=> 4

  # Wait for a while to show no change.
  sleep 20
  pp pool.length #=> 4
  
  # Single unit of work.
  1.times { pool << work }
  # Wait for state updates.
  sleep 0.25 
  # Expected scale down.
  pp pool.length #=> 1
  # Wait for all work to be processed.
  sleep 10
  pp pool.length #=> 1
  # Wait until idle time of all threads has elapsed.
  sleep 5
  pp pool.length #=> 1
end