riverqueue/river

Periodic Jobs question

sheldondz opened this issue · 4 comments

We are trying the new dynamic Periodic jobs feature in 0.6.0, and are facing the following issues

Schedule a job to run every day at 8AM, base on timezone

schedule, err := cron.ParseStandard("CRON_TZ=Asia/Calcutta  0 8 * * *")
	if err != nil {
		r.logger.Error("failed to add calendar event", log.Error(err))
	}
	handle := r.jobManager.Client.PeriodicJobs().Add(
		river.NewPeriodicJob(
			schedule,
			func() (river.JobArgs, *river.InsertOpts) {
				return domain.CalendarEventJobArgs{
					EventType:      "test event"
					EventId:       1
				}, nil
			},
			nil,
		),
	)

After this code is executed there is no entry added in the river_job table, if we set RunOnStart we see an entry and the worker also executes but after that the periodic jobs does not repeat.

We also tried a simple test with

periodicJobs := []*river.PeriodicJob{
		river.NewPeriodicJob(
			river.PeriodicInterval(15*time.Second),
			func() (river.JobArgs, *river.InsertOpts) {
				return domain.CalendarEventJobArgs{}, nil
			},
			&river.PeriodicJobOpts{RunOnStart: true},
		),
	}

	riverClient, err := river.NewClient(riverpgxv5.New(s.storage.Pool), &river.Config{
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
		},
		Workers:      workers,
		PeriodicJobs: periodicJobs,
	})
	if err != nil {
		s.logger.Error("failed to create river client", log.Error(err))
		return nil, err
	}

	// Run the client inline. All executed jobs will inherit from jobCtx:
	if err := riverClient.Start(ctx); err != nil {
		// handle error
		s.logger.Error("failed to start river client", log.Error(err))
		return nil, err
	}

Here too an entry is added in the DB the worker runs immediately, after that the worker does not run again after 15 sec. Is there something i am missing here?

Here's a fully functional code sample based on your code above:

package main

import (
	"context"
	"fmt"
	"log/slog"
	"os"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/riverqueue/river"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
)

type CalendarEventJobArgs struct{}

func (CalendarEventJobArgs) Kind() string { return "calendar_event" }

type CalendarEventWorker struct {
	// An embedded WorkerDefaults sets up default methods to fulfill the rest of
	// the Worker interface:
	river.WorkerDefaults[CalendarEventJobArgs]
}

func (w *CalendarEventWorker) Work(ctx context.Context, job *river.Job[CalendarEventJobArgs]) error {
	fmt.Printf("%s: CalendarEvented ran\n", time.Now().Format(time.DateTime))
	return nil
}

func doStart(ctx context.Context, logger *slog.Logger) (*struct{}, error) {
	dbPool, err := pgxpool.New(ctx, "postgres://localhost/river-periodic-job-test")
	if err != nil {
		return nil, err
	}

	periodicJobs := []*river.PeriodicJob{
		river.NewPeriodicJob(
			river.PeriodicInterval(15*time.Second),
			func() (river.JobArgs, *river.InsertOpts) {
				return CalendarEventJobArgs{}, nil
			},
			&river.PeriodicJobOpts{RunOnStart: true},
		),
	}

	workers := river.NewWorkers()
	// AddWorker panics if the worker is already registered or invalid:
	river.AddWorker(workers, &CalendarEventWorker{})

	riverClient, err := river.NewClient(riverpgxv5.New(dbPool), &river.Config{
		Queues: map[string]river.QueueConfig{
			river.QueueDefault: {MaxWorkers: 100},
		},
		Workers:      workers,
		PeriodicJobs: periodicJobs,
	})
	if err != nil {
		logger.Error("failed to create river client", "err", err)
		return nil, err
	}

	// Run the client inline. All executed jobs will inherit from jobCtx:
	if err := riverClient.Start(ctx); err != nil {
		// handle error
		logger.Error("failed to start river client", "err", err)
		return nil, err
	}

	return nil, nil
}

func main() {
	ctx := context.Background()
	logger := slog.New(slog.NewTextHandler(os.Stdout, nil))

	if _, err := doStart(ctx, logger); err != nil {
		panic(err)
	}

	foreverChan := make(chan struct{})
	<-foreverChan
}

Run it, and you can see the job running every 15 seconds:

$ go run main.go
2024-05-17 11:03:22: CalendarEvented ran
2024-05-17 11:03:37: CalendarEvented ran
2024-05-17 11:03:52: CalendarEvented ran
2024-05-17 11:04:07: CalendarEvented ran
2024-05-17 11:04:22: CalendarEvented ran
2024-05-17 11:04:37: CalendarEvented ran
2024-05-17 11:04:52: CalendarEvented ran
2024-05-17 11:05:07: CalendarEvented ran
2024-05-17 11:05:22: CalendarEvented ran
2024-05-17 11:05:37: CalendarEvented ran
2024-05-17 11:05:52: CalendarEvented ran
2024-05-17 11:06:07: CalendarEvented ran
2024-05-17 11:06:22: CalendarEvented ran
2024-05-17 11:06:37: CalendarEvented ran

Here's the same thing in the database:

river-periodic-job-test=# select kind, state, finalized_at from river_job order by id;
      kind      |   state   |         finalized_at
----------------+-----------+-------------------------------
 calendar_event | completed | 2024-05-17 02:02:05.964429-07
 calendar_event | completed | 2024-05-17 02:02:20.959239-07
 calendar_event | completed | 2024-05-17 02:02:35.958579-07
 calendar_event | completed | 2024-05-17 02:02:50.956588-07
 calendar_event | completed | 2024-05-17 02:03:05.958406-07
 calendar_event | completed | 2024-05-17 02:03:22.724301-07
 calendar_event | completed | 2024-05-17 02:03:37.725189-07
 calendar_event | completed | 2024-05-17 02:03:52.723318-07
 calendar_event | completed | 2024-05-17 02:04:07.724498-07
 calendar_event | completed | 2024-05-17 02:04:22.72267-07
 calendar_event | completed | 2024-05-17 02:04:37.724326-07

Based on code not shown in your sample above:

  1. Is your job unique? If so, it may be excluded from periodic insert because a conforming job is already in the database.
  2. Does your program keep running after Client.Start's been invoked? If it ends too soon, obviously jobs won't be inserted.

@brandur Thanks for the detailed code sample, i think its because my job is unique its failing to run again. Will check this and update.

I can confirm that removing the uniqueOpts it works as expected. Thanks.

You got it!