Allow `.drain()` to only await for first calling thread
Closed this issue · 1 comments
Goal
TODO: Related
- Add max timeout option.
Multiple threads/promises shouldn't be held up when only 1 is needed to await pool.drain()
- (i.e. to keep AWS Lambda awake.)
Idea #1
The 1st .drain()
creates one unpacked promise to track the completion of tasks.
Subsequent calls to .drain(true)
should return Promise.resolve()
if another thread has already called .drain()
.
If tasks get added before drain completes... Question: Should late added tasks still be processed by the original awaiting .drain()
?
Risk: 1 waiting thread could get tasks faster than they complete, never being allowed to resolve while all subsequent requests fly through.
Idea #2
To solve for the never ending task queue scenario, we could move current tasks queue, so subsequent .add()
go to fresh inbox queue.
Change .drain()
to move tasks to an internal queue w/ only 1 promise associated with it, and any following calls to drain will return Promise.resolve()
. Calling .add
here would simply add to existing inbound processing queue, and the next .drain()
would create a new internal 'batch' with it's own promise - and more drain calls will resolve instantly whenever there's no new tasks.
Risk: This involves a lot of async allocations, copying & mutable operations on arrays, possibly out-of-order. This is a stunningly difficult design (especially since we eagerly execute.) Also, difficult to test in any readable, verifiable & rigorous way. To make this bulletproof, we'd need to implement locking w/ Atomics.
Idea #3
There are more ideal algorithms for this exact scenario (N-task publishers with a single exactly-once queue.)
One such algo is a Ring Buffer. It can provide a continuous rotating list of 'slots' for inbound tasks, where freed up spots get the next work item (not 100% literally, but close enough.)
It's common in Network adapter firmware and drivers.
There are other algorithms (also from the network tech space) for consuming a limited-buffered durable stream (TCP.) And the more efficient UDP method for packet aggregation/collation/streaming.
Idea #4
A possibly simpler approach could involve expanding the tracking metadata with info on which tasks each .drain()
is responsible for - think storing array ranges. Then each .drain()
would check for & determine the current range of inbox work and track the indices. This way each .drain()
gets an exclusive 'lock' on a range of items. If all tasks indexes are being awaited, then short cut the .drain()
return value with true
or Promise.resolve(true)
.
Idea #5
Extend .drain()
with a sort of 'promise passing' behavior. So each .drain()
call closes the previous promise and returns a new one, and so on. Upon completing the task queue, resolve the drain's promise (current behavior.)
Proposed API
// on Thread #1
await pool.drain() // will resolve on empty queue.
// on Thread #2+
await pool.drain() // will resolve IMMEDIATELY as long as another thread has called `.drain()`