taskiq.receiver.receiver.Receiver.prefetcher attached to a different loop
enlike opened this issue · 3 comments
enlike commented
Hi!
Facing issue when run taskiq
task_broker.py code below:
from taskiq_aio_pika import AioPikaBroker
broker = AioPikaBroker('amqp://guest:guest@localhost:5672')
deps:
taskiq 0.10.0
taskiq-aio-pika 0.4.0
taskiq-dependencies 1.4.0
taskiq-redis 0.5.0
Run configuration:
taskiq worker core.taskiq.task_broker:broker
Full error:
[2023-10-19 12:31:01,682][taskiq.receiver.receiver][INFO ][worker-1] Listening started.
Process worker-1:
Traceback (most recent call last):
File "/Users/pavelgarkin/.pyenv/versions/3.9.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
self.run()
File "/Users/pavelgarkin/.pyenv/versions/3.9.9/lib/python3.9/multiprocessing/process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "/Users/pavelgarkin/Library/Caches/pypoetry/virtualenvs/invoices-Q1Rkz41h-py3.9/lib/python3.9/site-packages/taskiq/cli/worker/run.py", line 145, in start_listen
loop.run_until_complete(receiver.listen())
File "/Users/pavelgarkin/.pyenv/versions/3.9.9/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
return future.result()
File "/Users/pavelgarkin/Library/Caches/pypoetry/virtualenvs/invoices-Q1Rkz41h-py3.9/lib/python3.9/site-packages/taskiq/receiver/receiver.py", line 308, in listen
gr.start_soon(self.runner, queue)
File "/Users/pavelgarkin/Library/Caches/pypoetry/virtualenvs/invoices-Q1Rkz41h-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
raise exceptions[0]
File "/Users/pavelgarkin/Library/Caches/pypoetry/virtualenvs/invoices-Q1Rkz41h-py3.9/lib/python3.9/site-packages/taskiq/receiver/receiver.py", line 326, in prefetcher
await self.sem_prefetch.acquire()
File "/Users/pavelgarkin/.pyenv/versions/3.9.9/lib/python3.9/asyncio/locks.py", line 413, in acquire
await fut
RuntimeError: Task <Task pending name='taskiq.receiver.receiver.Receiver.prefetcher' coro=<Receiver.prefetcher() running at /Users/pavelgarkin/Library/Caches/pypoetry/virtualenvs/invoices-Q1Rkz41h-py3.9/lib/python3.9/site-packages/taskiq/receiver/receiver.py:326> cb=[TaskGroup._spawn.<locals>.task_done() at /Users/pavelgarkin/Library/Caches/pypoetry/virtualenvs/invoices-Q1Rkz41h-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:661, _wait.<locals>._on_completion() at /Users/pavelgarkin/.pyenv/versions/3.9.9/lib/python3.9/asyncio/tasks.py:513]> got Future <Future pending> attached to a different loop
[2023-10-19 12:31:02,165][taskiq.process-manager][INFO ][MainProcess] worker-0 is dead. Scheduling reload.
[2023-10-19 12:31:02,166][taskiq.process-manager][INFO ][MainProcess] worker-1 is dead. Scheduling reload.
s3rius commented
Can you try updating to 0.10.1?
I tested with these versions:
taskiq 0.10.1
taskiq-aio-pika 0.4.0
Here's the code:
import asyncio
from taskiq_aio_pika import AioPikaBroker
broker = AioPikaBroker("amqp://guest:guest@localhost:5672")
@broker.task(task_name="left")
async def my_task(val: int):
print("left", val)
async def main():
await broker.startup()
await my_task.kiq(11)
await broker.shutdown()
if __name__ == "__main__":
asyncio.run(main())
I tried two commands:
taskiq worker a:broker --max-prefetch 20
and
taskiq worker a:broker
To send a task I simply executed provided script with python a.py
.
s3rius commented
Should be fixed in https://github.com/taskiq-python/taskiq/releases/tag/0.10.2.
enlike commented
fixed in 0.10.2 thx