taskiq-python/taskiq

The problem with LabelScheduleSource

ChronoDi opened this issue · 1 comments

Good afternoon!

I have such a problem: there is a function, it puts a task in the scheduler for execution on time:

    async def register_mailing(self, mailing_id: str, group_id: int, date: datetime):
        self._broker.register_task(
            func=sender.send_mailing,
            task_name=mailing_id,
            schedule=[
                {
                    "time": date.astimezone(timezone.utc),
                    "args": [int(mailing_id), group_id],
                },
            ],
        )

        logging.info(f'****** A task "{mailing_id}" to group {group_id} has been created for the date "{date}"')

When the execution time comes, the function is kicked 2 times, which causes problems. I tried using the kiq() method, but nothing has changed.

Scheduler and worker are started by the following code:

worker_task = asyncio.create_task(run_receiver_task(taskiq_controller.get_broker()))
scheduler_task = asyncio.create_task(run_scheduler_task(taskiq_controller.get_scheduler()))

try:
    await worker_task
except asyncio.CancelledError:
    logger.info("Worker successfully exited.")

try:
    await scheduler_task
except asyncio.CancelledError:
    logger.info("Scheduler successfully exited.")

In order to understand how many schedulers were started and how many tasks are in the queue, I made logs:

  1. When running the script (an empty dictionary is the output of the get_all_tasks() method)
{}
<Task pending name='Task-9' coro=<run_scheduler_task() running at /home/chrono/.virtualenvs/AutoPostingBot/lib/python3.10/site-packages/taskiq/api/scheduler.py:5>>
  1. After the task has been created:
{'93': AsyncTaskiqDecoratedTask(93)}
<Task pending name='Task-9' coro=<run_scheduler_task() running at /home/chrono/.virtualenvs/AutoPostingBot/lib/python3.10/site-packages/taskiq/api/scheduler.py:23> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[run_scheduler_loop.<locals>.<lambda>() at /home/chrono/.virtualenvs/AutoPostingBot/lib/python3.10/site-packages/taskiq/cli/scheduler/run.py:148]>
  1. When the time came, the logs show that the task was called 2 times, and each task has a unique id
run.py:122 #INFO    [2023-10-04 16:37:59,006] - taskiq.cli.scheduler.run - Sending task 93.
receiver.py:132 #INFO    [2023-10-04 16:37:59,007] - taskiq.receiver.receiver - Executing task 93 with ID: a8863b6e278a4c9381992a3a8b24fd2e
run.py:122 #INFO    [2023-10-04 16:38:01,001] - taskiq.cli.scheduler.run - Sending task 93.
receiver.py:132 #INFO    [2023-10-04 16:38:01,002] - taskiq.receiver.receiver - Executing task 93 with ID: e42e3dd1c21e479a964320ee77f2609a

Broker using Nats, Scheduler - LabelScheduleSource

self._broker = NatsBroker([f'nats://{config.nats.host}:{config.nats.port}'])
self._scheduler = TaskiqScheduler(broker=self._broker, sources=[LabelScheduleSource(self._broker)])
s3rius commented

Hi, @ChronoDi. I wrote simple example program with your setup and couldn't reproduce the issue. Seems like you have two schedulers who send tasks.

import asyncio
import logging
from datetime import datetime, timedelta

from taskiq_nats import NatsBroker

from taskiq.api import run_receiver_task, run_scheduler_task
from taskiq.schedule_sources.label_based import LabelScheduleSource
from taskiq.scheduler.scheduler import TaskiqScheduler

broker = NatsBroker(["nats://localhost:4222"], queue="my_queue")
scheduler = TaskiqScheduler(broker, [LabelScheduleSource(broker)])


def func():
    print("From task.")


async def main():
    logging.basicConfig(
        level=logging.DEBUG,
        format="[%(asctime)s][%(levelname)-7s] %(message)s",
    )
    await broker.startup()
    target_time = datetime.utcnow() + timedelta(seconds=5)
    broker.register_task(func, "a", schedule=[{"time": target_time}])

    rt = asyncio.create_task(run_receiver_task(broker))
    st = asyncio.create_task(run_scheduler_task(scheduler))

    try:
        await asyncio.sleep(100)
    except (KeyboardInterrupt, asyncio.CancelledError):
        print("\ngoodbye")
    finally:
        rt.cancel()
        st.cancel()

        await broker.shutdown()


if __name__ == "__main__":
    asyncio.run(main())

I used this code to test it. I found exact same behavior only and only if I start two scheduler tasks. If anyone could come up with same problem or solution, please write in this issue.