How to check all the tasks are completed
emelpolaris opened this issue · 1 comments
emelpolaris commented
I'm using taskiq_redis to distribute the tasks to multiple gpus.
I wanna know if all the tasks are completed or not.
Please let me know how I can this info.
Thanks.
s3rius commented
The easiest way would be to start using pipelines.
https://github.com/taskiq-python/taskiq-pipelines
import asyncio
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from taskiq_pipelines import Pipeline, PipelineMiddleware
broker = (
ListQueueBroker("redis://localhost:6379/0")
# Here we add the PipelineMiddleware to the broker,
# so we can use the pipelines in the broker
.with_middlewares(PipelineMiddleware())
# Here's the result backend for the broker,
# It's required for pipelines to work, because
# intermediate results should be stored somewhere
.with_result_backend(
RedisAsyncResultBackend(
"redis://localhost:6379/1",
keep_results=False,
)
)
)
@broker.task
async def my_task(a: int) -> int:
await asyncio.sleep(1)
return a * 2
@broker.task
async def generate_tasks(num: int) -> list[int]:
return list(range(num))
# Here's the pipeline to run your set of tasks
# It will generate some required values for tasks and then
# map each element in parallel.
#
# Also, we define the check_interval for my_task to be 1 second
# to minimize amount of requests to the result backend.
# You can granularly define check_interval for each task
# in the pipeline.
pipe = Pipeline(broker, generate_tasks).map(my_task, check_interval=1)
async def main():
await broker.startup()
# Pipeline itself is a task and therefore the interface is the same.
# You can wait for the result as if it was a regular task.
task = await pipe.kiq(10)
res = await task.wait_result(check_interval=1)
print(res.return_value)
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
Or, you can use utility function to gather all tasks as in asyncio.gather, by running:
import asyncio
from taskiq_redis import ListQueueBroker, RedisAsyncResultBackend
from taskiq import gather as taskiq_gather
broker = ListQueueBroker("redis://localhost:6379/0").with_result_backend(
RedisAsyncResultBackend(
"redis://localhost:6379/1",
keep_results=False,
)
)
@broker.task
async def my_task(a: int) -> int:
await asyncio.sleep(1)
return a * 2
async def main():
await broker.startup()
tasks = []
for i in range(10):
tasks.append(await my_task.kiq(i))
results = await taskiq_gather(*tasks, periodicity=1)
for result in results:
print(result.return_value)
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
To minimize amount of requests to the result_backend don't forget to specify delays between checks. Hope it helped you.