Periodic Jobs question
sheldondz opened this issue · 4 comments
We are trying the new dynamic Periodic jobs feature in 0.6.0, and are facing the following issues
Schedule a job to run every day at 8AM, base on timezone
schedule, err := cron.ParseStandard("CRON_TZ=Asia/Calcutta 0 8 * * *")
if err != nil {
r.logger.Error("failed to add calendar event", log.Error(err))
}
handle := r.jobManager.Client.PeriodicJobs().Add(
river.NewPeriodicJob(
schedule,
func() (river.JobArgs, *river.InsertOpts) {
return domain.CalendarEventJobArgs{
EventType: "test event"
EventId: 1
}, nil
},
nil,
),
)
After this code is executed there is no entry added in the river_job table, if we set RunOnStart we see an entry and the worker also executes but after that the periodic jobs does not repeat.
We also tried a simple test with
periodicJobs := []*river.PeriodicJob{
river.NewPeriodicJob(
river.PeriodicInterval(15*time.Second),
func() (river.JobArgs, *river.InsertOpts) {
return domain.CalendarEventJobArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true},
),
}
riverClient, err := river.NewClient(riverpgxv5.New(s.storage.Pool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
PeriodicJobs: periodicJobs,
})
if err != nil {
s.logger.Error("failed to create river client", log.Error(err))
return nil, err
}
// Run the client inline. All executed jobs will inherit from jobCtx:
if err := riverClient.Start(ctx); err != nil {
// handle error
s.logger.Error("failed to start river client", log.Error(err))
return nil, err
}
Here too an entry is added in the DB the worker runs immediately, after that the worker does not run again after 15 sec. Is there something i am missing here?
Here's a fully functional code sample based on your code above:
package main
import (
"context"
"fmt"
"log/slog"
"os"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverpgxv5"
)
type CalendarEventJobArgs struct{}
func (CalendarEventJobArgs) Kind() string { return "calendar_event" }
type CalendarEventWorker struct {
// An embedded WorkerDefaults sets up default methods to fulfill the rest of
// the Worker interface:
river.WorkerDefaults[CalendarEventJobArgs]
}
func (w *CalendarEventWorker) Work(ctx context.Context, job *river.Job[CalendarEventJobArgs]) error {
fmt.Printf("%s: CalendarEvented ran\n", time.Now().Format(time.DateTime))
return nil
}
func doStart(ctx context.Context, logger *slog.Logger) (*struct{}, error) {
dbPool, err := pgxpool.New(ctx, "postgres://localhost/river-periodic-job-test")
if err != nil {
return nil, err
}
periodicJobs := []*river.PeriodicJob{
river.NewPeriodicJob(
river.PeriodicInterval(15*time.Second),
func() (river.JobArgs, *river.InsertOpts) {
return CalendarEventJobArgs{}, nil
},
&river.PeriodicJobOpts{RunOnStart: true},
),
}
workers := river.NewWorkers()
// AddWorker panics if the worker is already registered or invalid:
river.AddWorker(workers, &CalendarEventWorker{})
riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
Workers: workers,
PeriodicJobs: periodicJobs,
})
if err != nil {
logger.Error("failed to create river client", "err", err)
return nil, err
}
// Run the client inline. All executed jobs will inherit from jobCtx:
if err := riverClient.Start(ctx); err != nil {
// handle error
logger.Error("failed to start river client", "err", err)
return nil, err
}
return nil, nil
}
func main() {
ctx := context.Background()
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
if _, err := doStart(ctx, logger); err != nil {
panic(err)
}
foreverChan := make(chan struct{})
<-foreverChan
}
Run it, and you can see the job running every 15 seconds:
$ go run main.go
2024-05-17 11:03:22: CalendarEvented ran
2024-05-17 11:03:37: CalendarEvented ran
2024-05-17 11:03:52: CalendarEvented ran
2024-05-17 11:04:07: CalendarEvented ran
2024-05-17 11:04:22: CalendarEvented ran
2024-05-17 11:04:37: CalendarEvented ran
2024-05-17 11:04:52: CalendarEvented ran
2024-05-17 11:05:07: CalendarEvented ran
2024-05-17 11:05:22: CalendarEvented ran
2024-05-17 11:05:37: CalendarEvented ran
2024-05-17 11:05:52: CalendarEvented ran
2024-05-17 11:06:07: CalendarEvented ran
2024-05-17 11:06:22: CalendarEvented ran
2024-05-17 11:06:37: CalendarEvented ran
Here's the same thing in the database:
river-periodic-job-test=# select kind, state, finalized_at from river_job order by id;
kind | state | finalized_at
----------------+-----------+-------------------------------
calendar_event | completed | 2024-05-17 02:02:05.964429-07
calendar_event | completed | 2024-05-17 02:02:20.959239-07
calendar_event | completed | 2024-05-17 02:02:35.958579-07
calendar_event | completed | 2024-05-17 02:02:50.956588-07
calendar_event | completed | 2024-05-17 02:03:05.958406-07
calendar_event | completed | 2024-05-17 02:03:22.724301-07
calendar_event | completed | 2024-05-17 02:03:37.725189-07
calendar_event | completed | 2024-05-17 02:03:52.723318-07
calendar_event | completed | 2024-05-17 02:04:07.724498-07
calendar_event | completed | 2024-05-17 02:04:22.72267-07
calendar_event | completed | 2024-05-17 02:04:37.724326-07
Based on code not shown in your sample above:
- Is your job unique? If so, it may be excluded from periodic insert because a conforming job is already in the database.
- Does your program keep running after
Client.Start
's been invoked? If it ends too soon, obviously jobs won't be inserted.
@brandur Thanks for the detailed code sample, i think its because my job is unique its failing to run again. Will check this and update.
I can confirm that removing the uniqueOpts it works as expected. Thanks.
You got it!