A distributed and reliable database backed, job execution framework
go get -u github.com/simpleframeworks/jobsd
- A db table is a queue.
- Every "job run" has an associated db row.
- Instances in a cluster compete to acquire and run a job (without locking).
- A worker pool runs jobs.
- A new reoccurring scheduled "job run" is created after a "job run" is complete.
- The db queue and the local JobsD instance queue are periodically synchronized.
Announce the time every minute on the minute.
jd := jobsd.New(db) // Create a JobsD service instance
// Register a Job that announces the time
jd.RegisterJob("Announce", func(name string) error {
fmt.Printf("Hi %s! The date/time is %s.\n", name, time.Now().Format(time.RFC1123))
return nil
})
// Register a schedule that tells JobsD when to trigger next
jd.RegisterSchedule("OnTheMin", func(now time.Time) time.Time {
return time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute()+1, 0, 0, now.Location())
})
jd.Up() // Bring up the JobsD service
// Create and schedule the job "Announce" to run "OnTheMin"
jd.CreateRun("Announce", "Simon").Schedule("OnTheMin").Run()
<-time.After(2*time.Minute) // Should really wait for OS kill signal here
jb.Down() // Shutdown the JobsD service instance, wait for running jobs to complete, record stats, and tidy up
A runnable example can be found in the examples folder. Just run it go run main.go
from the directory.
The characteristics of a job is as follows:
- Jobs are just funcs
- Jobs must return an error
- Jobs can have a number of params
- All job params must be serializable with gob encoding
- Across a cluster all jobs should be named the same and have the same implementation.
- Not all jobs need to implemented across the cluster (facilitates new code and new jobs)
- All jobs need to be registered before the instance
Up()
func is called - The first argument can optional be of the type
jobsd.RunInfo
- RunInfo contains a
Cancel
channel for graceful shutdown / timeout amongst other things
- RunInfo contains a
jobFunc1 := func() error {
//DO SOME STUFF
return nil
}
jobFunc2 := func(name string, age int) error {
//DO SOME STUFF
return nil
}
jd.RegisterJob("job1", jobFunc1)
jd.RegisterJob("job2", jobFunc2)
A schedule is a simple function that takes in the current time and returns the next scheduled time.
- Schedules must be registered before the
Up()
func is called
afterASecond := func(now time.Time) time.Time {
return now.Add(time.Second)
}
onTheMin := func(now time.Time) time.Time {
return time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute()+1, 0, 0, now.Location())
})
onTheHour := func(now time.Time) time.Time {
return time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+1, 0, 0, 0, now.Location())
})
jd.RegisterSchedule("afterASecond", afterASecond)
jd.RegisterSchedule("onTheMin", onTheMin)
jd.RegisterSchedule("onTheHour", onTheHour)
A job run is an instance of a job to be executed. A job must be registered first before creating a job run using jd.CreateRun("job1", args...)
jobFunc := func(txt string) error {
fmt.Printf("Hello %s!", txt)
return nil
}
scheduleFunc := func(now time.Time) time.Time {
return now.Add(time.Second)
}
jd := New(db)
jd.RegisterJob("job1", jobFunc)
jd.RegisterSchedule("schedule1", scheduleFunc)
jd.Up()
jd.CreateRun("job1", "World A").Run() // Run job1 once immediately
jd.CreateRun("job1", "World B").RunDelayed(time.Second) // Run job1 once after one second
jd.CreateRun("job1", "World C").Schedule("schedule1").Limit(2).Run() // Run job1 every second twice
jd.CreateRun("job1", "World D").Schedule("schedule1").Limit(2).RunAfter(time.Second) // After one second schedule job1 to run twice
// Runs only one "GlobalUniqueJob1" job at a time, across a cluster of JobsD instances
jd.CreateRun("job1", "World E").Unique("GlobalUniqueJob1").Run()
// Runs and schedules only one "GlobalUniqueJob2" job at a time, across a cluster of JobsD instances. Runs only twice.
jd.CreateRun("job1", "World F").Schedule("schedule1").Limit(2).Unique("GlobalUniqueJob2").Run()
<-time.After(10 * time.Second)
jd.Down() // Make sure to shutdown to cleanup and record stats
id, err := jd.CreateRun("job1", "World A").Run()
checkError(err)
runState := jd.GetRunState(id) // Get the run state of the job.
spew.Dump(runState.OriginID)
spew.Dump(runState.Name)
spew.Dump(runState.Job)
spew.Dump(runState.Schedule)
spew.Dump(runState.RunSuccessCount)
spew.Dump(runState.RunStartedAt)
spew.Dump(runState.RunStartedBy)
spew.Dump(runState.RunCompletedAt)
spew.Dump(runState.RunCompletedError)
spew.Dump(runState.RetriesOnErrorCount)
spew.Dump(runState.RetriesOnTimeoutCount)
spew.Dump(runState.CreatedAt)
spew.Dump(runState.CreatedBy)
err = runState.Refresh() // Refreshes the run state.
checkError(err)
jd := New(db)
jd.WorkerNum(10) // Set the number of workers to run the jobs
jd.PollInterval(10*time.Second) // The time between checks for new jobs across the cluster
jd.PollLimit(100) // The number of jobs to retrieve across the cluster at once
A job func needs to return an error
. If an error is returned a job can be retried. You can set how many times a retry is attempted
Error retries instance defaults
jd.RetriesErrorLimit(3) // How many times to retry a job when an error is returned (-1 = unlimited)
Error retries can be set on the Job
jd.RegisterJob("job1", jobFunc).RetriesErrorLimit(2) // -1 = unlimited
Error retries can be set on the Job Run
jd.CreateRun("job1", "World A").RetriesErrorLimit(2).Run() // -1 = unlimited
IMPORTANT: Timeouts will not kill running jobs, they will keep running. In order to cancel a running job on time out, add the jobsd.RunInfo
as the first argument in your job and use the jobsd.RunInfo.Cancel
channel (see example below).
Timeouts instance defaults
jd.RunTimeout(30*time.Minute) // How long before retrying a job (0 disables time outs)
jd.RetriesTimeoutLimit(3) // How many times to retry a job when it times out (-1 = unlimited)
jd.TimeoutCheck(10*time.Second) // The time between checks for jobs that have timed out (or crashed) on other nodes in the cluster
Timeouts can be set on the Job
jd.RegisterJob("job1", jobFunc).RunTimeout(10*time.Minute).RetriesTimeoutLimit(2)
// RunTimeout set to 0 disables time outs
// RetriesTimeoutLimit set to -1 = unlimited
Timeouts can be set on a Job Run
jd.CreateRun("job1", "World A").RunTimeout(1*time.Minute).RetriesTimeoutLimit(5).Run()
// RunTimeout set to 0 disables time outs
// RetriesTimeoutLimit set to -1 = unlimited
Create a job like the following
jobFunc := func(info RunInfo) error {
select {
case <-time.After(timeout + 10*time.Second):
fmt.Println("Did some work")
case <-info.Cancel:
fmt.Println("Job canceled")
}
return nil
}
jd.RegisterJob("CancelableJob", jobFunc)
PostgreSQL, SQLite, and MySQL are supported out of the box.
host := os.Getenv("JOBSD_PG_HOST")
port := os.Getenv("JOBSD_PG_PORT")
dbname := os.Getenv("JOBSD_PG_DB")
user := os.Getenv("JOBSD_PG_USER")
password := os.Getenv("JOBSD_PG_PASSWORD")
dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable", host, user, password, dbname, port)
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
jd := New(db)
host := os.Getenv("JOBSD_MY_HOST")
port := os.Getenv("JOBSD_MY_PORT")
dbname := os.Getenv("JOBSD_MY_DB")
user := os.Getenv("JOBSD_MY_USER")
password := os.Getenv("JOBSD_MY_PASSWORD")
dsn := fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", user, password, host, port, dbname)
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{
Logger: logc.NewGormLogger(logger),
})
checkError(err)
jd := New(db)
db, err0 := gorm.Open(sqlite.Open("file::memory:"), &gorm.Config{
Logger: logc.NewGormLogger(logger),
})
sqlDB, err := db.DB()
checkError(err)
// SQLLite does not work with concurrent connections
sqlDB.SetMaxIdleConns(1)
sqlDB.SetMaxOpenConns(1)
jd := New(db)
Auto migrations create the DB tables and structure required for JobsD. It is run when starting JobsD Up()
. Auto migrations are only required the first time JobsD runs so it can be disabled using the following method.
jd.AutoMigrate(false)
// Register Jobs and Schedules etc...
jd.Up()
A logger can be supplied. The logger must implement the logc interface
jd := New(db)
jd.Logger(logger)