riverqueue/river

Periodic river job `scheduled_at` does not match timestamp returned from schedule func

dhermes opened this issue · 11 comments

For example, using a cron.ParseStandard("CRON_TZ=UTC 39 * * * *") (this is github.com/robfig/cron/v3, see code below) my schedule func returned 2024-05-06T18:39:00.000Z but the scheduled_at on the job is 2024-05-06T18:39:00.000609Z.

Minimal viable repro

Example Go code
package main

import (
	"context"
	"fmt"
	"os"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/riverqueue/river"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	cron "github.com/robfig/cron/v3"
)

// HelloJobArgs is a job args struct for a `HelloJob`.
type HelloJobArgs struct{}

// Kind satisfies `river.JobArgs`.
func (HelloJobArgs) Kind() string {
	return "hello"
}

// HelloJob is a worker for printing "Hello, world!". It satisfies
// `river.Worker[HelloJobArgs]`.
type HelloJob struct {
	river.WorkerDefaults[HelloJobArgs]
}

// Work works a `HelloJob`.
func (*HelloJob) Work(_ context.Context, job *river.Job[HelloJobArgs]) error {
	fmt.Printf("Hello, world! Job ID: %d (scheduled at: %s)\n", job.ID, job.ScheduledAt)
	return nil
}

// NewPeriodicJob returns a `river.PeriodicJob` for running a `HelloJob` on
// a `crontab` schedule `s`.
func NewPeriodicJob(s river.PeriodicSchedule) *river.PeriodicJob {
	return river.NewPeriodicJob(
		s,
		func() (river.JobArgs, *river.InsertOpts) {
			now := time.Now().UTC()
			fmt.Printf("Producing job args: %s\n", now)
			return HelloJobArgs{}, nil
		},
		&river.PeriodicJobOpts{RunOnStart: false},
	)
}

type wrappedCron struct {
	Cron cron.Schedule
}

func (w *wrappedCron) Next(current time.Time) time.Time {
	next := w.Cron.Next(current)
	fmt.Printf("Next(%s) -> %s\n", current, next)
	return next
}

func getPool(ctx context.Context) (*pgxpool.Pool, error) {
	config, err := pgxpool.ParseConfig("postgres://...")
	if err != nil {
		return nil, err
	}

	config.ConnConfig.RuntimeParams["search_path"] = "riverqueue"
	return pgxpool.NewWithConfig(ctx, config)
}

func run() error {
	workers := river.NewWorkers()
	river.AddWorker(workers, &HelloJob{})

	s, err := cron.ParseStandard("CRON_TZ=UTC 39 * * * *")
	if err != nil {
		return err
	}

	periodicJob := NewPeriodicJob(&wrappedCron{Cron: s})

	ctx := context.Background()
	pool, err := getPool(ctx)
	if err != nil {
		return err
	}

	rc, err := river.NewClient(
		riverpgxv5.New(pool),
		&river.Config{
			Queues: map[string]river.QueueConfig{
				river.QueueDefault: {MaxWorkers: 100},
			},
			PeriodicJobs: []*river.PeriodicJob{periodicJob},
			Workers:      workers,
		},
	)
	if err != nil {
		return err
	}

	fmt.Printf("Starting: %s\n", time.Now().UTC())
	err = rc.Start(ctx)
	if err != nil {
		return err
	}

	time.Sleep(2 * time.Minute)
	return rc.Stop(ctx)
}

func main() {
	err := run()
	if err != nil {
		fmt.Fprintf(os.Stderr, "%v\n", err)
		os.Exit(1)
	}
}

Running the above script resulted in:

$ go run ./main.go
Starting: 2024-05-06 18:37:58.504128 +0000 UTC
Next(2024-05-06 18:38:10.734091 +0000 UTC) -> 2024-05-06 18:39:00 +0000 UTC
Next(2024-05-06 18:39:00 +0000 UTC) -> 2024-05-06 19:39:00 +0000 UTC
Producing job args: 2024-05-06 18:39:00.000374 +0000 UTC
Hello, world! Job ID: 1 (scheduled at: 2024-05-06 18:39:00.000609 +0000 UTC)

Observe the value in the database as well:

cyberdyne@127:hardfin> SELECT state, created_at, scheduled_at FROM riverqueue.river_job WHERE id = 1;
+-----------+-------------------------------+-------------------------------+
| state     | created_at                    | scheduled_at                  |
|-----------+-------------------------------+-------------------------------|
| completed | 2024-05-06 18:39:00.009521+00 | 2024-05-06 18:39:00.000609+00 |
+-----------+-------------------------------+-------------------------------+
SELECT 1
Time: 0.011s

Motivation

I have a periodic job that I want to run at "midnight in the tenant's relevant timezone" for each tenant in a multitenant app. I was trying to reason about how I could grab an accurate timestamp for when the job should have been scheduled for cases where it ends up being scheduled or running with some amount of latency.

I was originally thinking that it might make sense to pass in the relevant Next() output from the schedule func to the PeriodicJobConstructor. Then I realized that job.ScheduledAt in Go / river_job.scheduled_at in SQL has exactly what I want.

This issue was the result of me just double checking my assumptions about the relationship between the schedule func and the actual scheduling of the periodic job.

Post script

I considered opening a discussion for this, apologies in advance if I chose poorly.

@dhermes Well, admittedly this does strike me a little bit as an implementation detail, but I suppose it can't hurt, and it doesn't have the conceptual benefit that in a busy system where jobs are being prioritized for work according to their scheduled_at, periodic jobs will be prioritized according to when they were supposed to run, as opposed to when they were inserted. Opened #341.

@brandur I'm curious if you have any ideas for the case I mentioned (maybe my mental model is wrong here).

Essentially trying to pinpoint "it's this run" for a job that runs a small number of times per day (here it's a job that runs 24 times per day). (In this case trying to pinpoint "after midnight, within an hour or less" but towards specific timezones.)

I was choosing this over running my own separate scheduler. I suppose an alternative could've been to just run a job once a day and schedule 24 jobs with more specific input that is explicit.

@dhermes hmm, yeah the scheduled_at time (even after this fix) isn’t really a guaranteed way for you to detect what you’re looking for here. The reason is if the job retries for any reason, that timestamp will get bumped for the next attempt.

You could probably get pretty close using your algorithm for “it’s approximately X hour” because the periodic enqueuer should trigger shortly before the job is due. However you would then want to use that calculated timestamp as one of your job args so that it is persisted throughout all the job’s attempts.

I’ll admit this feels a bit hacky, though no better options come to mind at the moment 🤔

Awesome clarification @bgentry, thank you! I was trying to put my thinking cap on and try to avoid running my own scheduler.

I think this probably means I should go back to something like this:

type HelloJobArgs struct {
	Now time.Time
}


func NewPeriodicJob(s river.PeriodicSchedule) *river.PeriodicJob {
	return river.NewPeriodicJob(
		s,
		func() (river.JobArgs, *river.InsertOpts) {
			now := time.Now().UTC()
			fmt.Printf("Producing job args: %s\n", now)
			return HelloJobArgs{Now: now}, nil
		},
		&river.PeriodicJobOpts{RunOnStart: false},
	)
}

Has me now back to wondering if it's worth passing in a time.Time (the Next() result from the schedule func) into the job args constructorFunc.

Or maybe it really is "lesser evil" if I just made a meta-job that schedules out the future jobs far enough in advance.

@brandur I'm trying to recall, I feel like an earlier version of the periodic job implementation actually passed the "current run" timestamp (result of previous scheduleFunc call) as an arg into the constructor function. That could potentially be another way to address this use case of "I want my periodic job to have some awareness of which run time it was created for" if we think that is going to be at all common. There are ways of hacking similar functionality it into the current system (like a stateful scheduleFunc) but it is not at all elegant.

It may make sense in general to pass some kind of PeriodicJobMetadata{} struct into constructorFunc (even if it's an empty struct for now).

This way you can get a breaking change over with and if you need to add fields over time to PeriodicJobMetadata{} they will just be additive changes.

I'm trying to recall, I feel like an earlier version of the periodic job implementation actually passed the "current run" timestamp (result of previous scheduleFunc call) as an arg into the constructor function. That could potentially be another way to address this use case of "I want my periodic job to have some awareness of which run time it was created for" if we think that is going to be at all common

Hm, my memory's bad so this isn't certain, but I think the ScheduleFunc API's been relatively stable, at least in recent memory. And I think that's pretty much how it works currently. See here:

periodicJob.nextRunAt = periodicJob.ScheduleFunc(periodicJob.nextRunAt)

It may make sense in general to pass some kind of PeriodicJobMetadata{} struct into constructorFunc (even if it's an empty struct for now).

This way you can get a breaking change over with and if you need to add fields over time to PeriodicJobMetadata{} they will just be additive changes.

Generally agree, and I think if we were still very early release I'd just do it. Given the API's been quite stable for a while and that we know periodic jobs are a popular feature (we probably get more asks/notes on them compared to anything else), I think my gut instinct would be to leave the API for the time being, but still consider changing it if a killer use case for PeriodicJobMetadata came in and we could confirm for this that this is something we'd want.

Generally agree, and I think if we were still very early release I'd just do it. Given the API's been quite stable for a while and that we know periodic jobs are a popular feature (we probably get more asks/notes on them compared to anything else), I think my gut instinct would be to leave the API for the time being, but still consider changing it if a killer use case for PeriodicJobMetadata came in and we could confirm for this that this is something we'd want.

Yeah totally agree on the instinct here! Only other thing that feels like an option would to put some kind of callback in the final arg opts *PeriodicJobOpts. (I had the idea but dismissed it as too clunky.)

That said, it's not make-or-break right now. Thanks for the discussion and great library!

Only other thing that feels like an option would to put some kind of callback in the final arg opts *PeriodicJobOpts. (I had the idea but dismissed it as too clunky.)

Yeah, that might not be too bad either. One nice thing about use of opts would be that anyone who didn't need this new escape hatch would never have to see it in that it wouldn't be part of the mainline API at all, but rather an additional opt that you can find digging into the docs.

That said, it's not make-or-break right now. Thanks for the discussion and great library!

No worries! Thanks for the great feedback on all this.

Figured I'd drop this "river recipe" in here if anyone else has this similar use case:

// NOTE: Ensure that
// - `ReplayableSchedule` satisfies `river.PeriodicSchedule`.
var (
	_ river.PeriodicSchedule = (*ReplayableSchedule)(nil)
)

// ReplayableSchedule is a cron schedule that captures its most recent return
// value for replay.
type ReplayableSchedule struct {
	mu sync.Mutex

	Schedule          river.PeriodicSchedule
	LastScheduledTime time.Time
}

// Next produces the next timestamp for the schedule and captures that value
// for replay.
func (rs *ReplayableSchedule) Next(t time.Time) time.Time {
	rs.mu.Lock()
	defer rs.mu.Unlock()

	rs.LastScheduledTime = rs.Schedule.Next(t)
	return rs.LastScheduledTime
}

Putting it into action with a small hello job:

Example Go code
package main

import (
	"context"
	"fmt"
	"os"
	"sync"
	"time"

	"github.com/jackc/pgx/v5/pgxpool"
	"github.com/riverqueue/river"
	"github.com/riverqueue/river/riverdriver/riverpgxv5"
	cron "github.com/robfig/cron/v3"
)

////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////

// NOTE: Ensure that
// - `ReplayableSchedule` satisfies `river.PeriodicSchedule`.
var (
	_ river.PeriodicSchedule = (*ReplayableSchedule)(nil)
)

// ReplayableSchedule is a cron schedule that captures it's most recent return
// value for replay.
type ReplayableSchedule struct {
	mu sync.Mutex

	Schedule          river.PeriodicSchedule
	LastScheduledTime time.Time
}

// Next produces the next timestamp for the schedule and captures that value
// for replay.
func (rs *ReplayableSchedule) Next(t time.Time) time.Time {
	rs.mu.Lock()
	defer rs.mu.Unlock()

	rs.LastScheduledTime = rs.Schedule.Next(t)
	return rs.LastScheduledTime
}

////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////

// HelloJobArgs is a job args struct for a `HelloJob`.
type HelloJobArgs struct {
	ScheduledAt time.Time `json:"scheduled_at"`
}

// Kind satisfies `river.JobArgs`.
func (HelloJobArgs) Kind() string {
	return "hello"
}

// HelloJob is a worker for printing "Hello, world!". It satisfies
// `river.Worker[HelloJobArgs]`.
type HelloJob struct {
	river.WorkerDefaults[HelloJobArgs]
}

// Work works a `HelloJob`.
func (*HelloJob) Work(_ context.Context, job *river.Job[HelloJobArgs]) error {
	fmt.Printf(
		"Hello, world! Job ID: %d.%d (job scheduled at: %s, args scheduled at: %s)\n",
		job.ID, job.Attempt, job.ScheduledAt, job.Args.ScheduledAt,
	)

	if job.Attempt < 2 {
		return fmt.Errorf("failed attempt %d", job.Attempt)
	}
	return nil
}

// NewPeriodicJob returns a `river.PeriodicJob` for running a `HelloJob` on
// a schedule `s`.
func NewPeriodicJob(s river.PeriodicSchedule) *river.PeriodicJob {
	rs := ReplayableSchedule{Schedule: s}

	return river.NewPeriodicJob(
		&rs,
		func() (river.JobArgs, *river.InsertOpts) {
			lastScheduledTime := rs.LastScheduledTime
			now := time.Now().UTC()
			fmt.Printf("Producing job args; now=%s; rs.LastScheduledTime=%s\n", now, lastScheduledTime)
			return HelloJobArgs{ScheduledAt: lastScheduledTime}, nil
		},
		&river.PeriodicJobOpts{RunOnStart: false},
	)
}

func getPool(ctx context.Context) (*pgxpool.Pool, error) {
	config, err := pgxpool.ParseConfig("postgres://...")
	if err != nil {
		return nil, err
	}

	config.ConnConfig.RuntimeParams["search_path"] = "riverqueue"
	return pgxpool.NewWithConfig(ctx, config)
}

func run() error {
	workers := river.NewWorkers()
	river.AddWorker(workers, &HelloJob{})

	s, err := cron.ParseStandard("CRON_TZ=UTC 27 * * * *")
	if err != nil {
		return err
	}

	periodicJob := NewPeriodicJob(s)

	ctx := context.Background()
	pool, err := getPool(ctx)
	if err != nil {
		return err
	}

	rc, err := river.NewClient(
		riverpgxv5.New(pool),
		&river.Config{
			Queues: map[string]river.QueueConfig{
				river.QueueDefault: {MaxWorkers: 100},
			},
			PeriodicJobs: []*river.PeriodicJob{periodicJob},
			Workers:      workers,
		},
	)
	if err != nil {
		return err
	}

	fmt.Printf("Starting: %s\n", time.Now().UTC())
	err = rc.Start(ctx)
	if err != nil {
		return err
	}

	time.Sleep(2 * time.Minute)
	return rc.Stop(ctx)
}

func main() {
	err := run()
	if err != nil {
		fmt.Fprintf(os.Stderr, "%v\n", err)
		os.Exit(1)
	}
}

See an example run:

$ go run ./main.go 
Starting: 2024-06-11 14:26:16.162359 +0000 UTC
Producing job args; now=2024-06-11 14:27:00.009817 +0000 UTC; rs.LastScheduledTime=2024-06-11 14:27:00 +0000 UTC
Hello, world! Job ID: 3.1 (job scheduled at: 2024-06-11 14:27:00 +0000 UTC, args scheduled at: 2024-06-11 14:27:00 +0000 UTC)
time=2024-06-11T09:27:00.024-05:00 level=ERROR msg="jobExecutor: Job errored" error="failed attempt 1" job_id=3 job_kind=hello
Hello, world! Job ID: 3.2 (job scheduled at: 2024-06-11 14:27:01.022371 +0000 UTC, args scheduled at: 2024-06-11 14:27:00 +0000 UTC)

In particular, job.Args.ScheduledAt matches job.ScheduledAt on the first run and remains static for other attempts if the job fails and has to be re-attempted.


For emphasis, here is where the ReplayableSchedule{} actually gets used:

// NewPeriodicJob returns a `river.PeriodicJob` for running a `HelloJob` on
// a schedule `s`.
func NewPeriodicJob(s river.PeriodicSchedule) *river.PeriodicJob {
	rs := ReplayableSchedule{Schedule: s}

	return river.NewPeriodicJob(
		&rs,
		func() (river.JobArgs, *river.InsertOpts) {
			lastScheduledTime := rs.LastScheduledTime
			now := time.Now().UTC()
			fmt.Printf("Producing job args; now=%s; rs.LastScheduledTime=%s\n", now, lastScheduledTime)
			return HelloJobArgs{ScheduledAt: lastScheduledTime}, nil
		},
		&river.PeriodicJobOpts{RunOnStart: false},
	)
}