taskiq-python/taskiq

Allow scheduling one-time tasks for the future

fubuloubu opened this issue · 5 comments

I have a scenario where I know a time in the future (a variable time) that I want to trigger a task to execute only once, and I want to be able to dynamically schedule this task in the task queue from another task without doing asyncio.sleep (which keeps the task that detects this condition running until it's complete)

Do you have any suggestions of how I can do this, or implement functionality into TaskIQ for this?

s3rius commented

Sure. I would suggest to use taskiq scheduler for that.

Generic implementation only uses labels, but you can add implementation that fits you more. I created an example of a scheduler that can dynamically store tasks in postgresql.

https://gist.github.com/s3rius/f54301d28fe34159f24b7bb9c0d51939

Sure. I would suggest to use taskiq scheduler for that.

Generic implementation only uses labels, but you can add implementation that fits you more. I created an example of a scheduler that can dynamically store tasks in postgresql.

https://gist.github.com/s3rius/f54301d28fe34159f24b7bb9c0d51939

Ah, okay yeah that's a good design. Basically store the schedules in some sort of long-term storage, then query on a cron for tasks to kiq into the queue

s3rius commented

Exactly.

Why not just use the with_labels(delay=N) on a regular call of kicker()? Isn't that the point of it -- run the task after N amount of time has passed?

Why not just use the with_labels(delay=N) on a regular call of kicker()? Isn't that the point of it -- run the task after N amount of time has passed?

To follow up on this, we ended up with a regular polling of a database record containing a future time to process the task for a number of reasons:

  1. Resilient to fault reset on the task manager service
  2. The time field in the database record may change (increase in time), so avoids resetting a delayed task execution
  3. We can control burst load when a lot of records come due at once by doing a LIMIT query until the queue is empty (in case that becomes a problem with the task manager service we have an envvar we can fiddle with)