How to get execution time of each job & run each job with a timeout?
Closed this issue · 0 comments
matthewoestreich commented
Putting this here for visibility...
Example of how to record "execution duration" + run each job with an "auto cancelling" timeout.
I understand it is not mint code, so please feel free to suggest changes, or supply your own improved version.
package main
import (
"context"
"fmt"
"time"
"github.com/gammazero/workerpool"
)
// JobResult holds a jobs result
type JobResult struct {
// Public
Data interface{}
// Private
err error
runtime time.Duration
name string
}
// Name returns name. It is written like this so the consumer
// cannot change the name outside of supplying one via the Job
func (jr *JobResult) Name() string {
return jr.name
}
// Runtime returns job execution runtime
func (jr *JobResult) Runtime() time.Duration {
return jr.runtime
}
// Error holds job errors, if any
func (jr *JobResult) Error() error {
return jr.err
}
// SetError sets an error on our result
func (jr *JobResult) SetError(e error) {
jr.err = e
}
// Job holds job data
type Job struct {
Name string
Task func() JobResult
}
func wrapJob(timeout time.Duration, resultsChan chan JobResult, job Job) func() {
// Create our context with timeout per job
timeoutContext, timeoutCancel := context.WithTimeout(context.Background(), timeout)
return func() {
timerStart := time.Now()
// Start goroutine using our context, which contains our timeout.
go func(ctx context.Context, done context.CancelFunc, resChan chan JobResult, todo Job, startTime time.Time) {
// Get result from job
result := todo.Task()
// Set name & execution time after job completion
result.runtime = time.Since(startTime)
result.name = todo.Name
// If the timeout has been hit then `timeoutContext.Err()`
// will be != nil and we should not send it on our results chan.
//
// Without this check we would send this job twice due to the fact
// we cannot cancel in-flight requests.
//
// Lets say we have a long running task, how would we cancel it
// in-flight? Whether http request or simply running `time.Sleep(time.Hour*999999)`?
//
// Instead we just don't do anything with the return, hence this check.
if timeoutContext.Err() == nil {
resChan <- result
}
// Forcefully cancel our context.
// Cancelling forcefully is not bad, essentially it means success
done()
}(timeoutContext, timeoutCancel, resultsChan, job, timerStart)
select {
// If our timeout is hit *or* cancelled forcefully, we wind up here...
case <-timeoutContext.Done():
// ...that is why we check for error
switch timeoutContext.Err() {
// ...specifically the timeout error.
case context.DeadlineExceeded:
// Place a result on our results channel that contains
// an error, which we can check for later.
resultsChan <- JobResult{
err: context.DeadlineExceeded,
name: job.Name,
runtime: time.Since(timerStart),
}
}
}
}
}
var jobs = []Job{{
Name: "job1",
Task: func() JobResult {
// THIS JOB WILL ERROR ON PURPOSE
// This will surpass our timeout and should get cancelled
// ...you can do whatever you want in these jobs
time.Sleep(time.Second * 3)
// Don't have to set the name here
return JobResult{Data: map[string]string{"Whatever": "You want"}}
}}, {
Name: "job2",
Task: func() JobResult {
// THIS JOB WILL SUCCEED
time.Sleep(time.Millisecond * 300)
resultFromCurl := "i am a result"
return JobResult{Data: resultFromCurl}
}},
}
func main() {
// Set timeout here (or per job)
jobTimeout := time.Duration(time.Second * 1) // 1 second
// Create results channel with T type where T is whatever type you need
jobResultsChannel := make(chan JobResult, len(jobs))
// Create workerpool
numWorkers := 10
pool := workerpool.New(numWorkers)
// Submit jobs to workerpool using our wrapper func
for _, job := range jobs {
pool.Submit(wrapJob(jobTimeout, jobResultsChannel, job))
}
// Wait for jobs to finish and close results channel
pool.StopWait()
close(jobResultsChannel)
// Do whatever you want with results
for jobResult := range jobResultsChannel {
runTime := int(jobResult.Runtime() / time.Millisecond)
str := "[%dms] : '%s' : JobSuccess : %s\n"
data := jobResult.Data
if jobResult.Error() != nil { // You should always check for errors
str = "[%dms] : '%s' : JobError : %s\n"
data = jobResult.Error()
}
fmt.Printf(str, runTime, jobResult.Name(), data)
}
}
//// Output:
// [303ms] 'job2' : JobSuccess : i am a result
// [1001ms] 'job1' : JobError : context deadline exceeded