Ability to `kiq` a task by path
pahrohfit opened this issue · 2 comments
Is it possible to pass a task to the queue by its path, rather than via import?
Instead of:
file: my/package/module/tasks.py
@broker.task
async def add_one(value: int) -> int:
return value + 1
file: my/package/foo/module/api.py
from my.package.module.tasks import add_one
task = await add_one.kiq(1)
Would be like:
file: my/package/module/tasks.py
@broker.task
async def add_one(value: int) -> int:
return value +1
file: my/package/foo/module/api.py
await broker.kiq('my.package.module.tasks.add_one', *args, **kwargs)
Yes, it's possible.
from taskiq_redis import ListQueueBroker
from taskiq.kicker import AsyncKicker
broker = ListQueueBroker("redis://localhost:6379")
@broker.task(task_name="task_name")
def task():
print("Hello World!")
async def main():
await broker.startup()
await AsyncKicker("task_name", broker, {}).kiq()
await broker.shutdown()
if __name__ == "__main__":
import asyncio
asyncio.run(main())
But please note, that to run it it should been imported by broker on startup. To import it automatically, use filesystem-discover
, as described here: https://taskiq-python.github.io/guide/cli.html#auto-importing
I dug through the code a little, and came up with this (feels a little cleaner and easier to read):
await taskiq_broker().find_task('my.package.module:my_task').kiq()
Where taskiq_broker
is my initialized broker
, and my tasks are decorated with async_shared_broker.task
in a Sanic application.
Problem I was working around was some lazy import looping, but also with calling tasks that are not necessary to be included in the Sanic
code base (fully external job runners). So the imports for the Worker
weren't the problem.
Thank you very much for your quick response!