taskiq-python/taskiq

`kiq().with_labels(delay=X)` in documents not a valid invocation?

pahrohfit opened this issue · 5 comments

The documentation for Kicker shows .with_labels(delay=1):

async def main():
    # This task was initially assigned to broker,
    # but this time it is going to be sent using
    # the second broker with additional label `delay=1`.
    task = await my_async_task.kicker().with_broker(second_broker).with_labels(delay=1).kiq()
    print(await task.get_result())

But this doesn't appear to be a valid invocation of taskiq, just purely an example with a label? This code, when set to delay=600 executes from the Worker immedately, rather than after a delay.

Is the intention to use the Scheduler to achieve this functionality? If so, the docs should probably remove delay in favor of something else as a generic, since delay seems to only be part of cli.scheduler.run.delayed_send() -- or better yet, maybe delay should become a standard part of the kiq() invocation to remove the dependancies on a single running Scheduler if not looking for anything remotely crontab based and light weight, and is a cleaner resolution to #187 ?

I can't speak to the intent with regards to Scheduler, but the delay label has a use in the taskiq-aio-pika plugin library here, which is only relevant if you intend on using RabbitMQ as a message broker.

I can't speak to the intent with regards to Scheduler, but the delay label has a use in the taskiq-aio-pika plugin library here, which is only relevant if you intend on using RabbitMQ as a message broker.

Ahhhh ... just read though it and see that its expected to be implemented at the broker layer.

If you want to have delayed tasks, I'd suggest you to use scheduler and schedule task by time. The scheduler is compatible with any broker but it should run as a separate service.

Here's the discussion with how you can use it. https://github.com/orgs/taskiq-python/discussions/275 I will update docs later.

Do I get it right that scheduler polls sources once a minute so if I want to schedule a new task with a few seconds delay using RedisScheduleSource it can take up to a minute for scheduler to send it to workers? Doesn't sound very good, I hope I'm wrong.

No, you're correct. It will take up to minute for a default scheduler to send a task. If you want to add delay for few seconds I'd ask you to do something like this:

import asyncio
from typing import Any, Coroutine

async def _sub_delay(seconds: int, future: Coroutine[Any, Any, Any]):
    await asyncio.sleep(seconds)
    await future


def delayed_await(seconds: int, future: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:
    loop = asyncio.get_running_loop()
    return loop.create_task(_sub_delay(seconds, future))

This thing allows you to create small delays before sending tasks on client's side without awaiting them. But be aware that such technique is good only for small delays.

The usage is like this:

delayed_await(3, my_task.kiq("arg1", 2))