madflojo/tasks

Passing in `context.Context` to the `TaskFunc`

Closed this issue · 7 comments

Currently, there's no way to access the context.Context instance from inside the TaskFunc. There should be an argument added to the TaskFunc type, like so:

		id, err := s.scheduler.Add(&tasks.Task{
			Interval: task.interval,
			TaskFunc: func(ctx context.Context) error {
                             q.db.QueryRowContext(ctx, "...")
			}
		})

This would allow passing in the context to other functions (and also propagate cancellation).

A semi-related article regarding this topic – here

Also related to topic of contexts, It would be useful to have a function exposed which handles graceful shutdown (and blocking) - currently I've had to resort to "workarounds" to prevent the main process from existing like this one:

func (s *Scheduler) Block() {
	ts := s.scheduler.Tasks()
	defer s.scheduler.Stop()
	if len(ts) > 0 {
		select {} // blocks main() from exiting
	}
}
...
sch := scheduler.New(...)
sch.Block()

I'm curious about how you are using tasks. We use contexts within tasks to coordinate shutdown, but typically when you want to unscheduled a task, you'd delete it.

From the blocking perspective, typically tasks are used within services that have some other service i.e. an HTTP listener, or gRPC listener, that is blocking. Are you only running tasks?

I'm curious about how you are using tasks. We use contexts within tasks to coordinate shutdown, but typically when you want to unscheduled a task, you'd delete it.

My concern is that without having access to the context.Context inside the TaskFunc, one can't properly implement graceful shutdown -- calling tasks.Stop() will clean up the jobs, but because they don't have access to the context, so the cancellation signal won't get propagated.

From the blocking perspective, typically tasks are used within services that have some other service i.e. an HTTP listener, or gRPC listener, that is blocking. Are you only running tasks?

Yes, I run a separate "scheduler" process that only has 1 instance to avoid the need of storage / locking. This way I can have N amount of HTTP server instances which will consume "jobs" from a queue which were produced by the scheduler process.

My concern is that without having access to the context.Context inside the TaskFunc, one can't properly implement graceful shutdown -- calling tasks.Stop() will clean up the jobs, but because they don't have access to the context, so the cancellation signal won't get propagated.

That's fair, I think we can probably add a WithContext version, so we avoid changing the interface.

Yes, I run a separate "scheduler" process that only has 1 instance to avoid the need of storage / locking. This way I can have N amount of HTTP server instances which will consume "jobs" from a queue which were produced by the scheduler process.

Ok, I'm not opposed to adding something that lets users wait until the scheduler is stopped, just was curious about the usage.

So I was going to implement some of these changes this morning, but I realized that what you are trying to do is already possible.

	ctx, cancel := context.WithCancel(context.Background())
	task := tasks.Task{
		TaskFunc: func() error {
			return ctx.Err()
		},
	}

While in hindsight, it would have been useful to add a context option, and maybe even it would be useful to add the Task type to the function arguments. I'm avoiding interface changes at this point unless that interface change can be one size fits all.

I'll keep looking into the Blocking side of things and think about the best way to implement that.

fr3fou commented

So I was going to implement some of these changes this morning, but I realized that what you are trying to do is already possible.

	ctx, cancel := context.WithCancel(context.Background())
	task := tasks.Task{
		TaskFunc: func() error {
			return ctx.Err()
		},
	}

While in hindsight, it would have been useful to add a context option, and maybe even it would be useful to add the Task type to the function arguments. I'm avoiding interface changes at this point unless that interface change can be one size fits all.

This doesn't quite do the the same thing. This creates one context instance, the point is that each TaskFunc invokation should get the internal context instance -

tasks/tasks.go

Lines 257 to 260 in 8b7bb74

// Stop the task
defer t.cancel()

This will allow for proper cancel propogation from the scheduler.Del(name) method.

Suppose a long running task gets called every hour, if one wants to support graceful termination/shutdown then they should call scheduler.Stop() which will call scheduler.Del(name) for every task. This will in turn call cancel() for the internal scheduler context, however that cancellation won't get propagated to the TaskFunc code, meaning if there's, let's say a long running SQL query, that won't be properly cancelled.

		id, err := s.scheduler.Add(&tasks.Task{
			Interval: task.interval,
			TaskFunc: func(ctx context.Context) error {
                             q.db.QueryRowContext(ctx, "...") // this will get properly cancelled if you had access to the internal context.Context instance
			}
		})

If you don't want to change the lib's API, that's fine, but I really think that having an optional TaskFuncContext field would be very useful.

Hrm I see that does make more sense. I’m also thinking about creating a struct that can be passed to functions that would contain the task ID and other potential items users would want to pass to functions.

I thought originally you were looking to pass your own context.