sysid/sse-starlette

RuntimeError during unit tests: Future attached to a different loop

Closed this issue · 4 comments

I tried to write pytest testcases for a FastAPI route that returns an event source/stream. The first test always succeedes, but the second test, despite identical content, consistently runs into an error: RuntimeError: Task ... got Future ... attached to a different loop. I'm not sure if the error is related to sse_starlette, fastapi, httpx or pytest-asyncio. I'll try it here first.

Here is a minimal example that can be used to reproduce the issue:

from httpx import AsyncClient
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
import asyncio

app = FastAPI()

@app.post("/foo")
async def http_foo() -> EventSourceResponse:
    async def event_generator():
        yield {"data": "1"}
        yield {"data": "2"}
        yield {"data": "3"}
    return EventSourceResponse(event_generator())

def parse_event_stream(text):
    events = []
    for line in text.strip().split("\r\n\r\n"):
        events.append(line[len("data:"):].strip())
    return events

async def test_first():
    client = AsyncClient(app=app, base_url="http://test")
    async with client:
        response = await client.post("/foo")
    events = parse_event_stream(response.text)
    assert events == ["1", "2", "3"]

async def test_second():
    client = AsyncClient(app=app, base_url="http://test")
    async with client:
        response = await client.post("/foo")
    events = parse_event_stream(response.text)
    assert events == ["1", "2", "3"]

Versions used:

sse-starlette 1.6.1
fastapi 0.101.0
httpx 0.24.1
pytest 7.4.0
pytest-asyncio 0.21.1

My pytest config looks like this:

[tool.pytest.ini_options]
asyncio_mode = "auto"
testpaths = ["tests"]

And here is the full error log:

$ poetry run pytest -vv -s -k sse
===================================================================== test session starts ======================================================================
platform darwin -- Python 3.8.15, pytest-7.4.0, pluggy-1.2.0 -- /Users/chris/Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/bin/python
cachedir: .pytest_cache
rootdir: /Users/chris/test-project
configfile: pyproject.toml
testpaths: tests
plugins: timeout-2.1.0, asyncio-0.21.1, mock-3.11.1, anyio-3.7.1
asyncio: mode=auto
collected 54 items / 52 deselected / 2 selected                                                                                                                

tests/test_sse.py::test_first PASSED
tests/test_sse.py::test_second FAILED

=========================================================================== FAILURES ===========================================================================
_________________________________________________________________________ test_second __________________________________________________________________________

    async def test_second():
        client = prepare_test()
        async with client:
>           response = await client.post("/foo")

tests/test_sse.py:33: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1848: in post
    return await self.request(
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1530: in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1617: in send
    response = await self._send_handling_auth(
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1645: in _send_handling_auth
    response = await self._send_handling_redirects(
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1682: in _send_handling_redirects
    response = await self._send_single_request(request)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_client.py:1719: in _send_single_request
    response = await transport.handle_async_request(request)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/httpx/_transports/asgi.py:162: in handle_async_request
    await self.app(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/fastapi/applications.py:289: in __call__
    await super().__call__(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/applications.py:122: in __call__
    await self.middleware_stack(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/middleware/errors.py:184: in __call__
    raise exc
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/middleware/errors.py:162: in __call__
    await self.app(scope, receive, _send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/middleware/exceptions.py:79: in __call__
    raise exc
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/middleware/exceptions.py:68: in __call__
    await self.app(scope, receive, sender)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py:20: in __call__
    raise e
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/fastapi/middleware/asyncexitstack.py:17: in __call__
    await self.app(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/routing.py:718: in __call__
    await route.handle(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/routing.py:276: in handle
    await self.app(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/starlette/routing.py:69: in app
    await response(scope, receive, send)
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/sse_starlette/sse.py:251: in __call__
    await wrap(partial(self.listen_for_disconnect, receive))
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:597: in __aexit__
    raise exceptions[0]
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/sse_starlette/sse.py:240: in wrap
    await func()
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/sse_starlette/sse.py:215: in listen_for_exit_signal
    await AppStatus.should_exit_event.wait()
../Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:1778: in wait
    if await self._event.wait():
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <asyncio.locks.Event object at 0x1346e4730 [unset]>

    async def wait(self):
        """Block until the internal flag is true.
    
        If the internal flag is true on entry, return True
        immediately.  Otherwise, block until another coroutine calls
        set() to set the flag to true, then return True.
        """
        if self._value:
            return True
    
        fut = self._loop.create_future()
        self._waiters.append(fut)
        try:
>           await fut
E           RuntimeError: Task <Task pending name='sse_starlette.sse.EventSourceResponse.__call__.<locals>.wrap' coro=<EventSourceResponse.__call__.<locals>.wrap() running at /Users/chris/Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/sse_starlette/sse.py:240> cb=[TaskGroup._spawn.<locals>.task_done() at /Users/chris/Library/Caches/pypoetry/virtualenvs/test-project-h2N-_wM8-py3.8/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:661]> got Future <Future pending> attached to a different loop

../.pyenv/versions/3.8.15/lib/python3.8/asyncio/locks.py:309: RuntimeError
=================================================================== short test summary info ====================================================================
FAILED tests/test_sse.py::test_second - RuntimeError: Task <Task pending name='sse_starlette.sse.EventSourceResponse.__call__.<locals>.wrap' coro=<EventSourceResponse.__call__.<locals>.wrap() run...
========================================================== 1 failed, 1 passed, 52 deselected in 1.39s ==========================================================
sysid commented

I don't think this is sse-starlette related. It is sometime a bit tricky to have correct event-loop handling in combination with pytest.

If you face the issue outside pytest in regular app, please reopen.

dleen commented

I had the same issue, only with SSE, so what I had to was for any tests involving SSEs I launched the webapp in a subprocess and made calls using a real http client (not the test client).

@hinzundcode did you ever get to the bottom of this? I seem to be facing the same issue, thanks!

Started a discussion in FastAPI in case anyone's interested!
fastapi/fastapi#10518