gammazero/workerpool

How to get execution time of each job & run each job with a timeout?

Closed this issue · 0 comments

Putting this here for visibility...

Example of how to record "execution duration" + run each job with an "auto cancelling" timeout.

I understand it is not mint code, so please feel free to suggest changes, or supply your own improved version.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/gammazero/workerpool"
)

// JobResult holds a jobs result
type JobResult struct {
	// Public
	Data interface{}

	// Private
	err     error
	runtime time.Duration
	name    string
}

// Name returns name. It is written like this so the consumer
// cannot change the name outside of supplying one via the Job
func (jr *JobResult) Name() string {
	return jr.name
}

// Runtime returns job execution runtime
func (jr *JobResult) Runtime() time.Duration {
	return jr.runtime
}

// Error holds job errors, if any
func (jr *JobResult) Error() error {
	return jr.err
}

// SetError sets an error on our result
func (jr *JobResult) SetError(e error) {
	jr.err = e
}

// Job holds job data
type Job struct {
	Name string
	Task func() JobResult
}

func wrapJob(timeout time.Duration, resultsChan chan JobResult, job Job) func() {
	// Create our context with timeout per job
	timeoutContext, timeoutCancel := context.WithTimeout(context.Background(), timeout)

	return func() {
		timerStart := time.Now()
		// Start goroutine using our context, which contains our timeout.
		go func(ctx context.Context, done context.CancelFunc, resChan chan JobResult, todo Job, startTime time.Time) {
			// Get result from job
			result := todo.Task()

			// Set name & execution time after job completion
			result.runtime = time.Since(startTime)
			result.name = todo.Name

			// If the timeout has been hit then `timeoutContext.Err()`
			// will be != nil and we should not send it on our results chan.
			//
			// Without this check we would send this job twice due to the fact
			// we cannot cancel in-flight requests.
			//
			// Lets say we have a long running task, how would we cancel it
			// in-flight? Whether http request or simply running `time.Sleep(time.Hour*999999)`?
			//
			// Instead we just don't do anything with the return, hence this check.
			if timeoutContext.Err() == nil {
				resChan <- result
			}

			// Forcefully cancel our context.
			// Cancelling forcefully is not bad, essentially it means success
			done()
		}(timeoutContext, timeoutCancel, resultsChan, job, timerStart)

		select {
		// If our timeout is hit *or* cancelled forcefully, we wind up here...
		case <-timeoutContext.Done():
			// ...that is why we check for error
			switch timeoutContext.Err() {
			// ...specifically the timeout error.
			case context.DeadlineExceeded:
				// Place a result on our results channel that contains
				// an error, which we can check for later.
				resultsChan <- JobResult{
					err:     context.DeadlineExceeded,
					name:    job.Name,
					runtime: time.Since(timerStart),
				}
			}
		}
	}
}

var jobs = []Job{{
	Name: "job1",
	Task: func() JobResult {
		// THIS JOB WILL ERROR ON PURPOSE
		// This will surpass our timeout and should get cancelled
		// ...you can do whatever you want in these jobs
		time.Sleep(time.Second * 3)
		// Don't have to set the name here
		return JobResult{Data: map[string]string{"Whatever": "You want"}}
	}}, {
	Name: "job2",
	Task: func() JobResult {
		// THIS JOB WILL SUCCEED
		time.Sleep(time.Millisecond * 300)
		resultFromCurl := "i am a result"
		return JobResult{Data: resultFromCurl}
	}},
}

func main() {
	// Set timeout here (or per job)
	jobTimeout := time.Duration(time.Second * 1) // 1 second

	// Create results channel with T type where T is whatever type you need
	jobResultsChannel := make(chan JobResult, len(jobs))

	// Create workerpool
	numWorkers := 10
	pool := workerpool.New(numWorkers)

	// Submit jobs to workerpool using our wrapper func
	for _, job := range jobs {
		pool.Submit(wrapJob(jobTimeout, jobResultsChannel, job))
	}

	// Wait for jobs to finish and close results channel
	pool.StopWait()
	close(jobResultsChannel)

	// Do whatever you want with results
	for jobResult := range jobResultsChannel {
		runTime := int(jobResult.Runtime() / time.Millisecond)
		str := "[%dms] : '%s' : JobSuccess : %s\n"
		data := jobResult.Data

		if jobResult.Error() != nil { // You should always check for errors
			str = "[%dms] : '%s' : JobError : %s\n"
			data = jobResult.Error()
		}

		fmt.Printf(str, runTime, jobResult.Name(), data)
	}
}
//// Output:
// [303ms] 'job2' : JobSuccess : i am a result
// [1001ms] 'job1' : JobError : context deadline exceeded