Should Stop() be renamed or split into Close() and Wait()?
trakhimenok opened this issue · 7 comments
Stumble upon this lib and it looks good but I have a concern the Stop()
method could be named better.
When I read example code I had impression Stop()
would wait for current processes to complete, clear execution tasks queue and quit. But in reality seems it's wait until all tasks completed.
Should not it be named something like Close()
or CloseAndWait()
or something?
May be even better if there are 2 methods Close()
to signal there are no more tasks and Wait()
to wait for tasks completion.
You are correct that Stop()
implies an immediate stop - clear the queue and exit after only currently running tasks complete. However, taskQueue
is cleared as fast as possible, nothing is allowed to build up there (this is a key design concept in the articles linked in the README). Everything that comes into taskQueue
is immediately dispatched to an available worker, or put on the waitingQueue
. In fact, the taskQueue
can only hold one item: https://github.com/gammazero/workerpool/blob/master/workerpool.go#L32.
So, there may be many pending tasks sitting in the waitingQueue
when Stop()
is called. However, as soon as the taskQueue
is seen as closed, after waiting for no more that one item to be read, everything sitting in the waitingQueue
is abandoned:
https://github.com/gammazero/workerpool/blob/master/workerpool.go#L138
The dispatcher then exits and signals it is done.
Only the currently running tasks were waited for in the look that reaps the workers as they finish:
https://github.com/gammazero/workerpool/blob/master/workerpool.go#L190
Therefore, I think Stop() does do what is implies, although I could stand to document it more clearly. What is not available is the ability to close and wait for all queued tasks, sitting in waitingQueue
, to finish.
A function to close and wait for all currently running and queued tasks to finish would be helpful. I misunderstood what Stop()
does and couldn't figure out why my program wasn't processing everything that I had enqueued until a colleague showed me this ticket.
@gammazero Maybe offtopic, after closer review of the code I don't see purpose of taskQueue
.
Would not it be better if the Submit()
checks for available workers and if non push to the waitingQueue
diretly?
Something like:
func (p *WorkerPool) Submit(task func()) {
if task == nil {
return
}
select {
case workerTaskChan = <-p.readyWorkers:
// A worker is ready, so give task to worker.
workerTaskChan <- task
default:
p.waitingQueue.PushBack(task)
}
}
This should save 2 locks by bypassing taskQueue
. Am I missing something?
@astec The reason for taskQueue
is purely to feed the dispatcher goroutine. The dispatcher is necessary for queueing tasks when there are no available workers. What you say is correct, the locks could be saved, but then where are the tasks queued? It would mean that the worker pool would block when there are no available workers.
While that is a legitimate thing to do in certain circumstances, a primary design goal of this worker pool is to limit concurrency without ever blocking the producer. In order to never block the producer, the tasks need to be immediately offloaded to somewhere when the necessarily limited number of workers are all busy. This means to other goroutine(s), to a queue, to a database, to somewhere. Sending to an available worker or to a queue requires a critical section (mutex or goroutine). A goroutine is ideal here, since it provides the needed serialization, and it allows the work to be done concurrently, hence the dispatcher.
@aevernon There are a couple of this can be handled.
Workerpool
itself can implement aStopWait()
function that waits for all queued tasks to finish.- This can be handled by the application using
Workerpool
by using async.WaitGroup
to wait until all tasks are finished before shutting down the workerpool.
Consider you use case, and if you think that workerpool
would benefit from a StopWait()
, then open a new issue. I have tried to keep workerpool
as simple as possible, but this may be a good addition.
@gammazero after reading description of deque
package I see the reasons now (to provide thread safety when pushing to waitingQueu
). Thanks for explanation.