taskiq-python/taskiq

Ability to `kiq` a task by path

pahrohfit opened this issue · 2 comments

Is it possible to pass a task to the queue by its path, rather than via import?

Instead of:

file: my/package/module/tasks.py

@broker.task
async def add_one(value: int) -> int:
    return value + 1

file: my/package/foo/module/api.py

from my.package.module.tasks import add_one
task = await add_one.kiq(1)

Would be like:

file: my/package/module/tasks.py

@broker.task
async def add_one(value: int) -> int:
    return value +1

file: my/package/foo/module/api.py

await broker.kiq('my.package.module.tasks.add_one', *args, **kwargs)

Yes, it's possible.

from taskiq_redis import ListQueueBroker
from taskiq.kicker import AsyncKicker

broker = ListQueueBroker("redis://localhost:6379")


@broker.task(task_name="task_name")
def task():
    print("Hello World!")


async def main():
    await broker.startup()
    await AsyncKicker("task_name", broker, {}).kiq()
    await broker.shutdown()


if __name__ == "__main__":
    import asyncio

    asyncio.run(main())

But please note, that to run it it should been imported by broker on startup. To import it automatically, use filesystem-discover, as described here: https://taskiq-python.github.io/guide/cli.html#auto-importing

I dug through the code a little, and came up with this (feels a little cleaner and easier to read):

    await taskiq_broker().find_task('my.package.module:my_task').kiq()

Where taskiq_broker is my initialized broker, and my tasks are decorated with async_shared_broker.task in a Sanic application.

Problem I was working around was some lazy import looping, but also with calling tasks that are not necessary to be included in the Sanic code base (fully external job runners). So the imports for the Worker weren't the problem.

Thank you very much for your quick response!