taskiq-python/taskiq

Possible scheduled tasks duplication

stinovlas opened this issue · 3 comments

As far as I know, current design of scheduler works like this:

  1. send all tasks that are scheduled to current minute
  2. wait until next minute start (while asynchronously keeping an eye on scheduled tasks)
  3. repeat send with delays with one minute interval

This creates a possible duplication of tasks. If I restart scheduler in a minute where there is a cron scheduled task, this task is sent twice (once before the restart and once after). On the other hand, it allows the scheduler to be restarted without missing a task, assuming the restart takes less then one minute to perform (which is quite reasonable assumption for scheduler).

Ideally, tasks should be idempotent, but that's not always possible. It would be great if taskiq kept in mind whether particular task has already been sent to worker queue in one minute and if so, wouldn't send it again after restart. This is obviously not possible with LabelScheduleSource since it doesn't have any permanent storage, but it might be possible with taskiq_redis.RedisScheduleSource.

What do you think about this? Should I propose an enhancement in taskiq_redis? Does something need to be done in taskiq itself?

I totally agree that for some systems that might be the case. And here's what I think. We have middlewares and post_send events on each scheduler. You can create a simple storage that might be attached to all schedule sources.

import datetime
from typing import Any, Coroutine, List
from redis.asyncio import Redis
from taskiq import ScheduleSource
from taskiq.scheduler.scheduled_task import ScheduledTask
from taskiq.utils import maybe_awaitable


class ScheduleProtector(ScheduleSource):
    def __init__(self, redis_url: str, source: ScheduleSource) -> None:
        self.redis = Redis.from_url(redis_url)
        self.source = source

    def get_schedules(self) -> Coroutine[Any, Any, List[ScheduledTask]]:
        return self.source.get_schedules()

    def add_schedule(self, schedule: ScheduledTask) -> Coroutine[Any, Any, None]:
        return self.source.add_schedule(schedule)

    def delete_schedule(self, schedule_id: str) -> Coroutine[Any, Any, None]:
        return self.source.delete_schedule(schedule_id)

    async def pre_send(self, task: ScheduledTask) -> Coroutine[Any, Any, None]:
        res = await self.redis.get(task.schedule_id)
        if res is not None:
            try:
                ts = int(res)
                if ts > int(datetime.datetime.utcnow().timestamp()):
                    raise RuntimeError("The task has already been sent.")
            except ValueError:
                pass
        return await maybe_awaitable(self.source.pre_send(task))

    async def post_send(self, task: ScheduledTask) -> None:
        ts = datetime.datetime.utcnow().timestamp()
        await self.redis.set(task.schedule_id, int(ts), ex=59)
        return await maybe_awaitable(self.source.post_send(task))

It might be used like this:

labeled_source = ScheduleProtector("redis://localhost/0", LabeledSource(broker))

I've created a small example of such thing. Please try it out. If it would work for you, we might merge this thing into taskiq-redis.

Also, about the scheduler logic. It was updated a little bit. I will present these changes a bit later. Now we just ask all scheduler sources once a minute and wait for time when task should be send to send it right on time. Now code is much cleaner and comprehensive in this part of a project.

Thank you for the prompt response :-).

This example indeed mostly works (with some minor changes and corrections). What's left is catching the exception in TaskiqScheduler.on_ready. Perhaps we could create AbortError to signify the intention of aborting the send in pre_send. This is not strictly necessary, but produces an uncaught exception otherwise.

I don't think that ts > int(datetime.datetime.utcnow().timestamp()) can ever happen on scheduler restart. The stored value always has to be lower than current time, since we don't store value higher than current time. This might work:

    async def pre_send(self, task: ScheduledTask) -> None:
        res = await self.redis.get(task.schedule_id)
        if res is not None:
            try:
                ts = int(res)
                if ts > datetime.datetime.now(tz=datetime.timezone.utc).replace(second=0, microsecond=0).timestamp():
                    raise RuntimeError("The task has already been sent.")
            except ValueError:
                pass
        return await maybe_awaitable(self.source.pre_send(task))

Also, some kind of prefix would be in order if this is added to taskiq_redis so we don't populate redis with schedule ids in a global namespace. I'd advise to use datetime.datetime.now(tz=datetime.timezone.utc) instead of datetime.utcnow(), because the latter doesn't do what most people expect on server with timezone different from UTC.

If the implementation in taskiq_redis is built over taskiq_redis.RedisScheduleSource, we might actually reuse connection_pool from the source instead of creating a new one in the protector.

I'm available for implementation in taskiq_redis and/or code review, if you want :-).

I totally agree with your statements. It was just a draft implementation. Please feel free to add this thing in taskiq-redis project. I will review your PR.