Scheduler can send task twice
sector119 opened this issue · 1 comments
Hello
In run_scheduler_loop function in next_minute calculation if datetime.now() = 17:22:59.55555 and we replace seconds and microseconds with 0 and add timedelta(minutes=1) we get next_minute=17:23:00
and when we calculate delay with delay = next_minute - datetime.now(), current time can be > or just a bit lower than next_minute, so we get negative delay like -0.000299 or positive like 0.000373
Don't you think that we should check delay like:
delay = (next_minute - datetime.now()).total_seconds()
if int(delay) <= 0:
delay = 60.0
await asyncio.sleep(delay)
Logs:
Next minute: 2024-02-22 14:13:00
Now: 2024-02-22 14:13:00.000299
Delay: -1 day, 23:59:59.999701
Delay total seconds: -0.000299
[2024-02-22 14:13:00,000][INFO ][run:delayed_send:130] Sending task billing:sync-payments.
Next minute: 2024-02-22 14:14:00
Now: 2024-02-22 14:13:00.001998
Delay: 0:00:59.998002
Delay total seconds: 59.998002
[2024-02-22 14:13:00,002][INFO ][run:delayed_send:130] Sending task billing:sync-payments.
Actually you're right, but I think it can be resolved by calculating next_minute right after sending all tasks.
diff --git a/taskiq/cli/scheduler/run.py b/taskiq/cli/scheduler/run.py
index 6a17a11..73f06f8 100644
--- a/taskiq/cli/scheduler/run.py
+++ b/taskiq/cli/scheduler/run.py
@@ -144,9 +144,6 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None:
running_schedules = set()
while True:
# We use this method to correctly sleep for one minute.
- next_minute = datetime.now().replace(second=0, microsecond=0) + timedelta(
- minutes=1,
- )
scheduled_tasks = await get_all_schedules(scheduler)
for source, task_list in scheduled_tasks.items():
for task in task_list:
@@ -165,7 +162,9 @@ async def run_scheduler_loop(scheduler: TaskiqScheduler) -> None:
)
running_schedules.add(send_task)
send_task.add_done_callback(running_schedules.discard)
-
+ next_minute = datetime.now().replace(second=0, microsecond=0) + timedelta(
+ minutes=1,
+ )
delay = next_minute - datetime.now()
await asyncio.sleep(delay.total_seconds())
In this setup it would be impossible to get into the situation with negative number of seconds. What do you think?