taskiq-python/taskiq

taskiq.receiver.receiver.Receiver.prefetcher attached to a different loop

enlike opened this issue · 3 comments

Hi!

Facing issue when run taskiq

task_broker.py code below:

from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker('amqp://guest:guest@localhost:5672')

deps:

taskiq                      0.10.0 
taskiq-aio-pika             0.4.0
taskiq-dependencies         1.4.0 
taskiq-redis                0.5.0 

Run configuration:

taskiq worker core.taskiq.task_broker:broker

Full error:

[2023-10-19 12:31:01,682][taskiq.receiver.receiver][INFO   ][worker-1] Listening started.
Process worker-1:
Traceback (most recent call last):
  File "/Users/pavelgarkin/.pyenv/versions/3.9.9/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/Users/pavelgarkin/.pyenv/versions/3.9.9/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/pavelgarkin/Library/Caches/pypoetry/virtualenvs/invoices-Q1Rkz41h-py3.9/lib/python3.9/site-packages/taskiq/cli/worker/run.py", line 145, in start_listen
    loop.run_until_complete(receiver.listen())
  File "/Users/pavelgarkin/.pyenv/versions/3.9.9/lib/python3.9/asyncio/base_events.py", line 642, in run_until_complete
    return future.result()
  File "/Users/pavelgarkin/Library/Caches/pypoetry/virtualenvs/invoices-Q1Rkz41h-py3.9/lib/python3.9/site-packages/taskiq/receiver/receiver.py", line 308, in listen
    gr.start_soon(self.runner, queue)
  File "/Users/pavelgarkin/Library/Caches/pypoetry/virtualenvs/invoices-Q1Rkz41h-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 597, in __aexit__
    raise exceptions[0]
  File "/Users/pavelgarkin/Library/Caches/pypoetry/virtualenvs/invoices-Q1Rkz41h-py3.9/lib/python3.9/site-packages/taskiq/receiver/receiver.py", line 326, in prefetcher
    await self.sem_prefetch.acquire()
  File "/Users/pavelgarkin/.pyenv/versions/3.9.9/lib/python3.9/asyncio/locks.py", line 413, in acquire
    await fut
RuntimeError: Task <Task pending name='taskiq.receiver.receiver.Receiver.prefetcher' coro=<Receiver.prefetcher() running at /Users/pavelgarkin/Library/Caches/pypoetry/virtualenvs/invoices-Q1Rkz41h-py3.9/lib/python3.9/site-packages/taskiq/receiver/receiver.py:326> cb=[TaskGroup._spawn.<locals>.task_done() at /Users/pavelgarkin/Library/Caches/pypoetry/virtualenvs/invoices-Q1Rkz41h-py3.9/lib/python3.9/site-packages/anyio/_backends/_asyncio.py:661, _wait.<locals>._on_completion() at /Users/pavelgarkin/.pyenv/versions/3.9.9/lib/python3.9/asyncio/tasks.py:513]> got Future <Future pending> attached to a different loop
[2023-10-19 12:31:02,165][taskiq.process-manager][INFO   ][MainProcess] worker-0 is dead. Scheduling reload.
[2023-10-19 12:31:02,166][taskiq.process-manager][INFO   ][MainProcess] worker-1 is dead. Scheduling reload.

Can you try updating to 0.10.1?

I tested with these versions:

taskiq              0.10.1
taskiq-aio-pika     0.4.0

Here's the code:

import asyncio
from taskiq_aio_pika import AioPikaBroker

broker = AioPikaBroker("amqp://guest:guest@localhost:5672")


@broker.task(task_name="left")
async def my_task(val: int):
    print("left", val)


async def main():
    await broker.startup()

    await my_task.kiq(11)

    await broker.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

I tried two commands:

taskiq worker a:broker --max-prefetch 20

and

taskiq worker a:broker

image

To send a task I simply executed provided script with python a.py.

fixed in 0.10.2 thx