Ability to wait for background tasks to complete before moving to next test?
mzealey opened this issue · 0 comments
mzealey commented
Previously, we have been running the following code to allow various backgrounded asyncio tasks to complete before shutting down the event loop:
@pytest.fixture(scope="session")
def event_loop(request: Request) -> Generator[asyncio.AbstractEventLoop, None, None]:
"""Create an instance of the default event loop for each test case."""
def custom_exception_handler(loop: asyncio.AbstractEventLoop, context: dict) -> NoReturn:
loop.default_exception_handler(context)
exception = context.get("exception")
loop.stop()
raise exception
loop = asyncio.get_event_loop_policy().new_event_loop()
loop.set_exception_handler(custom_exception_handler)
yield loop
# Wait until all background tasks have completed
loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop)))
loop.close()
It would be nice to move this into the asyncio_default_fixture_loop_scope format, but it seems that the default event loop runner does not wait for background tasks to complete before the loop gets terminated.