pytest-dev/pytest-asyncio

Ability to wait for background tasks to complete before moving to next test?

mzealey opened this issue · 0 comments

Previously, we have been running the following code to allow various backgrounded asyncio tasks to complete before shutting down the event loop:

@pytest.fixture(scope="session") 
def event_loop(request: Request) -> Generator[asyncio.AbstractEventLoop, None, None]: 
    """Create an instance of the default event loop for each test case.""" 
 
    def custom_exception_handler(loop: asyncio.AbstractEventLoop, context: dict) -> NoReturn: 
        loop.default_exception_handler(context) 
 
        exception = context.get("exception") 
        loop.stop() 
        raise exception 
 
    loop = asyncio.get_event_loop_policy().new_event_loop() 
    loop.set_exception_handler(custom_exception_handler) 
    yield loop 
    # Wait until all background tasks have completed 
    loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop))) 
    loop.close() 

It would be nice to move this into the asyncio_default_fixture_loop_scope format, but it seems that the default event loop runner does not wait for background tasks to complete before the loop gets terminated.