RuntimeError during unit tests: Future attached to a different loop
hinzundcode opened this issue · 4 comments
I tried to write pytest testcases for a FastAPI route that returns an event source/stream. The first test always succeedes, but the second test, despite identical content, consistently runs into an error: RuntimeError: Task ... got Future ... attached to a different loop
. I'm not sure if the error is related to sse_starlette, fastapi, httpx or pytest-asyncio. I'll try it here first.
Here is a minimal example that can be used to reproduce the issue:
from httpx import AsyncClient
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio
app = FastAPI()
@app.post("/foo")
async def http_foo() -> EventSourceResponse:
async def event_generator():
yield {"data": "1"}
yield {"data": "2"}
yield {"data": "3"}
return EventSourceResponse(event_generator())
def parse_event_stream(text):
events = []
for line in text.strip().split("\r\n\r\n"):
events.append(line[len("data:"):].strip())
return events
async def test_first():
client = AsyncClient(app=app, base_url="http://test")
async with client:
response = await client.post("/foo")
events = parse_event_stream(response.text)
assert events == ["1", "2", "3"]
async def test_second():
client = AsyncClient(app=app, base_url="http://test")
async with client:
response = await client.post("/foo")
events = parse_event_stream(response.text)
assert events == ["1", "2", "3"]
Versions used:
sse-starlette 1.6.1
fastapi 0.101.0
httpx 0.24.1
pytest 7.4.0
pytest-asyncio 0.21.1
My pytest config looks like this:
[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]
And here is the full error log:
$ poetry run pytest -vv -s -k sse
===================================================================== test session starts ======================================================================
platform darwin -- Python 3.8.15, pytest-7.4.0, pluggy-1.2.0 -- /Users/chris/Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/bin/python
cachedir: .pytest_cache
rootdir: /Users/chris/test-project
configfile: pyproject.toml
testpaths: tests
plugins: timeout-2.1.0, asyncio-0.21.1, mock-3.11.1, anyio-3.7.1
asyncio: mode=auto
collected 54 items / 52 deselected / 2 selected
tests/test_sse.py::test_first PASSED
tests/test_sse.py::test_second FAILED
=========================================================================== FAILURES ===========================================================================
_________________________________________________________________________ test_second __________________________________________________________________________
async def test_second():
client = prepare_test()
async with client:
> response = await client.post("/foo")
tests/test_sse.py:33:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1848: in post
return await self.request(
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1530: in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1617: in send
response = await self._send_handling_auth(
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1645: in _send_handling_auth
response = await self._send_handling_redirects(
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1682: in _send_handling_redirects
response = await self._send_single_request(request)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1719: in _send_single_request
response = await transport.handle_async_request(request)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_transports/asgi.py:162: in handle_async_request
await self.app(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/fastapi/applications.py:289: in __call__
await super().__call__(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/applications.py:122: in __call__
await self.middleware_stack(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/middleware/errors.py:184: in __call__
raise exc
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/middleware/errors.py:162: in __call__
await self.app(scope, receive, _send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/middleware/exceptions.py:79: in __call__
raise exc
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/middleware/exceptions.py:68: in __call__
await self.app(scope, receive, sender)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py:20: in __call__
raise e
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py:17: in __call__
await self.app(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/routing.py:718: in __call__
await route.handle(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/routing.py:276: in handle
await self.app(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/routing.py:69: in app
await response(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/sse_starlette/sse.py:251: in __call__
await wrap(partial(self.listen_for_disconnect, receive))
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:597: in __aexit__
raise exceptions[0]
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/sse_starlette/sse.py:240: in wrap
await func()
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/sse_starlette/sse.py:215: in listen_for_exit_signal
await AppStatus.should_exit_event.wait()
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:1778: in wait
if await self._event.wait():
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <asyncio.locks.Event object at 0x1346e4730 [unset]>
async def wait(self):
"""Block until the internal flag is true.
If the internal flag is true on entry, return True
immediately. Otherwise, block until another coroutine calls
set() to set the flag to true, then return True.
"""
if self._value:
return True
fut = self._loop.create_future()
self._waiters.append(fut)
try:
> await fut
E RuntimeError: Task <Task pending name='sse_starlette.sse.EventSourceResponse.__call__.<locals>.wrap' coro=<EventSourceResponse.__call__.<locals>.wrap() running at /Users/chris/Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/sse_starlette/sse.py:240> cb=[TaskGroup._spawn.<locals>.task_done() at /Users/chris/Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:661]> got Future <Future pending> attached to a different loop
../.pyenv/versions/3.8.15/lib/python3.8/asyncio/locks.py:309: RuntimeError
=================================================================== short test summary info ====================================================================
FAILED tests/test_sse.py::test_second - RuntimeError: Task <Task pending name='sse_starlette.sse.EventSourceResponse.__call__.<locals>.wrap' coro=<EventSourceResponse.__call__.<locals>.wrap() run...
========================================================== 1 failed, 1 passed, 52 deselected in 1.39s ==========================================================
I don't think this is sse-starlette related. It is sometime a bit tricky to have correct event-loop handling in combination with pytest.
If you face the issue outside pytest in regular app, please reopen.
I had the same issue, only with SSE, so what I had to was for any tests involving SSEs I launched the webapp in a subprocess and made calls using a real http client (not the test client).
@hinzundcode did you ever get to the bottom of this? I seem to be facing the same issue, thanks!
Started a discussion in FastAPI in case anyone's interested!
fastapi/fastapi#10518