`kiq().with_labels(delay=X)` in documents not a valid invocation?
pahrohfit opened this issue · 5 comments
The documentation for Kicker shows .with_labels(delay=1)
:
async def main():
# This task was initially assigned to broker,
# but this time it is going to be sent using
# the second broker with additional label `delay=1`.
task = await my_async_task.kicker().with_broker(second_broker).with_labels(delay=1).kiq()
print(await task.get_result())
But this doesn't appear to be a valid invocation of taskiq
, just purely an example with a label? This code, when set to delay=600
executes from the Worker
immedately, rather than after a delay.
Is the intention to use the Scheduler
to achieve this functionality? If so, the docs should probably remove delay
in favor of something else as a generic, since delay
seems to only be part of cli.scheduler.run.delayed_send()
-- or better yet, maybe delay
should become a standard part of the kiq()
invocation to remove the dependancies on a single running Scheduler
if not looking for anything remotely crontab based and light weight, and is a cleaner resolution to #187 ?
I can't speak to the intent with regards to Scheduler
, but the delay label has a use in the taskiq-aio-pika
plugin library here, which is only relevant if you intend on using RabbitMQ as a message broker.
I can't speak to the intent with regards to
Scheduler
, but the delay label has a use in thetaskiq-aio-pika
plugin library here, which is only relevant if you intend on using RabbitMQ as a message broker.
Ahhhh ... just read though it and see that its expected to be implemented at the broker layer.
If you want to have delayed tasks, I'd suggest you to use scheduler and schedule task by time. The scheduler is compatible with any broker but it should run as a separate service.
Here's the discussion with how you can use it. https://github.com/orgs/taskiq-python/discussions/275 I will update docs later.
Do I get it right that scheduler polls sources once a minute so if I want to schedule a new task with a few seconds delay using RedisScheduleSource
it can take up to a minute for scheduler to send it to workers? Doesn't sound very good, I hope I'm wrong.
No, you're correct. It will take up to minute for a default scheduler to send a task. If you want to add delay for few seconds I'd ask you to do something like this:
import asyncio
from typing import Any, Coroutine
async def _sub_delay(seconds: int, future: Coroutine[Any, Any, Any]):
await asyncio.sleep(seconds)
await future
def delayed_await(seconds: int, future: Coroutine[Any, Any, Any]) -> asyncio.Task[Any]:
loop = asyncio.get_running_loop()
return loop.create_task(_sub_delay(seconds, future))
This thing allows you to create small delays before sending tasks on client's side without awaiting them. But be aware that such technique is good only for small delays.
The usage is like this:
delayed_await(3, my_task.kiq("arg1", 2))