/workerpool

Concurrency limiting goroutine pool

Primary LanguageGoMIT LicenseMIT

workerpool

GoDoc Build Status Go Report Card codecov License

Concurrency limiting goroutine pool. Limits the concurrency of task execution, not the number of tasks queued. Never blocks submitting tasks, no matter how many tasks are queued.

This implementation builds on ideas from the following:

Installation

To install this package, you need to setup your Go workspace. The simplest way to install the library is to run:

$ go get github.com/gammazero/workerpool

Example

package main

import (
	"fmt"
	"github.com/gammazero/workerpool"
)

func main() {
	wp := workerpool.New(2)
	requests := []string{"alpha", "beta", "gamma", "delta", "epsilon"}

	for _, r := range requests {
		r := r
		wp.Submit(func() {
			fmt.Println("Handling request:", r)
		})
	}

	wp.StopWait()
}

Example wrapper function to show start and finish time of submitted function.

In case you want panic-safe concurrent use of the worker pool that handles potential stops gracefully, you can use TrySubmit and TrySubmitWait. These methods will return an error instead of panic if the worker pool has been stopped, allowing you to handle such situations appropriately.

package main

import (
	"fmt"
	"github.com/gammazero/workerpool"
)

func main() {
	wp := workerpool.New(2)
	requests := []string{"alpha", "beta", "gamma", "delta", "epsilon"}

	for _, r := range requests {
		r := r
		if err := wp.TrySubmit(func() {
			fmt.Println("Handling request:", r)
		}); err != nil {
			fmt.Printf("Failed to submit task for request %s: %v", r, err)
		}
	}

	wp.StopWait()

	wp = workerpool.New(2)
	for _, r := range requests {
		r := r
		if err := wp.TrySubmitWait(func() {
			fmt.Println("Handling request with wait:", r)
		}); err != nil {
			fmt.Printf("Failed to submit and wait for task for request %s: %v", r, err)
		}
	}
}

Usage Note

There is no upper limit on the number of tasks queued, other than the limits of system resources. If the number of inbound tasks is too many to even queue for pending processing, then the solution is outside the scope of workerpool. It should be solved by distributing workload over multiple systems, and/or storing input for pending processing in intermediate storage such as a file system, distributed message queue, etc.