python-trio/pytest-trio

pytest-trio exits with TrioInternalError on Python 3.7 when pytest-asyncio is installed

seifertm opened this issue · 6 comments

I'm one of the maintainers of pytest-asyncio and I'm currently trying to address a reported incompatibilty between pytest-asyncio and pytest-trio v0.7.0.

The current release of pytest-asyncio (v0.18.2) contains a bug, which causes pytest to fail during the collection phase when the code base contains trio fixtures. I addressed the issue in a branch and came up with a corresponding test case:

from textwrap import dedent


def test_strict_mode_ignores_trio_fixtures(testdir):
    testdir.makepyfile(
        dedent(
            """\
        import pytest
        import pytest_asyncio
        import pytest_trio

        pytest_plugins = ["pytest_asyncio", "pytest_trio"]

        @pytest_trio.trio_fixture
        async def any_fixture():
            return True

        @pytest.mark.trio
        async def test_anything(any_fixture):
            pass
        """
        )
    )
    result = testdir.runpytest("--asyncio-mode=strict")
    result.assert_outcomes(passed=1)

The test case is successful on CPython 3.8 – 3.10, but it fails with Python 3.7. The reported error is a TrioInternalError. It's worth noting that the test also issues a RuntimeWarning and suggests to set host_uses_signal_set_wakeup_fd=True. However, I didn't find a way to configure this variable in pytest-trio.

This is the test output I'm seeing:

========== FAILURES ==========
________________________________________________________________________________________________ test_strict_mode_ignores_trio_fixtures ________________________________________________________________________________________________
/home/michael/pytest-asyncio/tests/trio/test_fixtures.py:25: in test_strict_mode_ignores_trio_fixtures
    result.assert_outcomes(passed=1)
E   AssertionError: assert {'errors': 0,...pped': 0, ...} == {'errors': 0,...pped': 0, ...}
E     Omitting 4 identical items, use -vv to show
E     Differing items:
E     {'failed': 1} != {'failed': 0}
E     {'passed': 0} != {'passed': 1}
E     Use -v to get the full diff
------------ Captured stdout call ------------------
============================= test session starts ==============================
platform linux -- Python 3.7.12, pytest-6.2.5, py-1.11.0, pluggy-1.0.0
rootdir: /tmp/pytest-of-michael/pytest-47/test_strict_mode_ignores_trio_fixtures0
plugins: asyncio-0.18.3.dev0+g929608e.d20220307, hypothesis-6.39.2, flaky-3.7.0, trio-0.7.0
asyncio: mode=strict
collected 1 item

test_strict_mode_ignores_trio_fixtures.py F                              [100%]

=================================== FAILURES ===================================
________________________________ test_anything _________________________________

runner = Runner(clock=SystemClock(offset=100594.58619596818), instruments={'_all': {}}, io_manager=EpollIOManager(_epoll=<selec..._autojump_threshold=inf, is_guest=False, guest_tick_scheduled=False, ki_pending=False, waiting_for_idle=SortedDict({}))
async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7fd38a2a7290>, any_fixture=<pytest_trio.plugin.TrioFixture object at 0x7fd389b72090>)
args = (), host_uses_signal_set_wakeup_fd = False

    def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False):
        locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True
        __tracebackhide__ = True
    
        try:
            if not host_uses_signal_set_wakeup_fd:
>               runner.entry_queue.wakeup.wakeup_on_signals()

/home/michael/pytest-asyncio/.tox/py37/lib/python3.7/site-packages/trio/_core/_run.py:2048: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <trio._core._wakeup_socketpair.WakeupSocketpair object at 0x7fd389b72550>

    def wakeup_on_signals(self):
        assert self.old_wakeup_fd is None
        if not is_main_thread():
            return
        fd = self.write_sock.fileno()
        self.old_wakeup_fd = signal.set_wakeup_fd(fd, warn_on_full_buffer=False)
        if self.old_wakeup_fd != -1:
            warnings.warn(
                RuntimeWarning(
>                   "It looks like Trio's signal handling code might have "
                    "collided with another library you're using. If you're "
                    "running Trio in guest mode, then this might mean you "
                    "should set host_uses_signal_set_wakeup_fd=True. "
                    "Otherwise, file a bug on Trio and we'll help you figure "
                    "out what's going on."
                )
            )
E           RuntimeWarning: It looks like Trio's signal handling code might have collided with another library you're using. If you're running Trio in guest mode, then this might mean you should set host_uses_signal_set_wakeup_fd=True. Otherwise, file a bug on Trio and we'll help you figure out what's going on.

/home/michael/pytest-asyncio/.tox/py37/lib/python3.7/site-packages/trio/_core/_wakeup_socketpair.py:58: RuntimeWarning

The above exception was the direct cause of the following exception:

runner = Runner(clock=SystemClock(offset=100594.58619596818), instruments={'_all': {}}, io_manager=EpollIOManager(_epoll=<selec..._autojump_threshold=inf, is_guest=False, guest_tick_scheduled=False, ki_pending=False, waiting_for_idle=SortedDict({}))
async_fn = functools.partial(<function _trio_test_runner_factory.<locals>._bootstrap_fixtures_and_run_test at 0x7fd38a2a7290>, any_fixture=<pytest_trio.plugin.TrioFixture object at 0x7fd389b72090>)
args = (), host_uses_signal_set_wakeup_fd = False

    def unrolled_run(runner, async_fn, args, host_uses_signal_set_wakeup_fd=False):
        locals()[LOCALS_KEY_KI_PROTECTION_ENABLED] = True
        __tracebackhide__ = True
    
        try:
            if not host_uses_signal_set_wakeup_fd:
                runner.entry_queue.wakeup.wakeup_on_signals()
    
            if "before_run" in runner.instruments:
                runner.instruments.call("before_run")
            runner.clock.start_clock()
            runner.init_task = runner.spawn_impl(
                runner.init, (async_fn, args), None, "<init>", system_task=True
            )
    
            # You know how people talk about "event loops"? This 'while' loop right
            # here is our event loop:
            while runner.tasks:
                if runner.runq:
                    timeout = 0
                else:
                    deadline = runner.deadlines.next_deadline()
                    timeout = runner.clock.deadline_to_sleep_time(deadline)
                timeout = min(max(0, timeout), _MAX_TIMEOUT)
    
                idle_primed = None
                if runner.waiting_for_idle:
                    cushion, _ = runner.waiting_for_idle.keys()[0]
                    if cushion < timeout:
                        timeout = cushion
                        idle_primed = IdlePrimedTypes.WAITING_FOR_IDLE
                # We use 'elif' here because if there are tasks in
                # wait_all_tasks_blocked, then those tasks will wake up without
                # jumping the clock, so we don't need to autojump.
                elif runner.clock_autojump_threshold < timeout:
                    timeout = runner.clock_autojump_threshold
                    idle_primed = IdlePrimedTypes.AUTOJUMP_CLOCK
    
                if "before_io_wait" in runner.instruments:
                    runner.instruments.call("before_io_wait", timeout)
    
                # Driver will call io_manager.get_events(timeout) and pass it back
                # in through the yield
                events = yield timeout
                runner.io_manager.process_events(events)
    
                if "after_io_wait" in runner.instruments:
                    runner.instruments.call("after_io_wait", timeout)
    
                # Process cancellations due to deadline expiry
                now = runner.clock.current_time()
                if runner.deadlines.expire(now):
                    idle_primed = None
    
                # idle_primed != None means: if the IO wait hit the timeout, and
                # still nothing is happening, then we should start waking up
                # wait_all_tasks_blocked tasks or autojump the clock. But there
                # are some subtleties in defining "nothing is happening".
                #
                # 'not runner.runq' means that no tasks are currently runnable.
                # 'not events' means that the last IO wait call hit its full
                # timeout. These are very similar, and if idle_primed != None and
                # we're running in regular mode then they always go together. But,
                # in *guest* mode, they can happen independently, even when
                # idle_primed=True:
                #
                # - runner.runq=empty and events=True: the host loop adjusted a
                #   deadline and that forced an IO wakeup before the timeout expired,
                #   even though no actual tasks were scheduled.
                #
                # - runner.runq=nonempty and events=False: the IO wait hit its
                #   timeout, but then some code in the host thread rescheduled a task
                #   before we got here.
                #
                # So we need to check both.
                if idle_primed is not None and not runner.runq and not events:
                    if idle_primed is IdlePrimedTypes.WAITING_FOR_IDLE:
                        while runner.waiting_for_idle:
                            key, task = runner.waiting_for_idle.peekitem(0)
                            if key[0] == cushion:
                                del runner.waiting_for_idle[key]
                                runner.reschedule(task)
                            else:
                                break
                    else:
                        assert idle_primed is IdlePrimedTypes.AUTOJUMP_CLOCK
                        runner.clock._autojump()
    
                # Process all runnable tasks, but only the ones that are already
                # runnable now. Anything that becomes runnable during this cycle
                # needs to wait until the next pass. This avoids various
                # starvation issues by ensuring that there's never an unbounded
                # delay between successive checks for I/O.
                #
                # Also, we randomize the order of each batch to avoid assumptions
                # about scheduling order sneaking in. In the long run, I suspect
                # we'll either (a) use strict FIFO ordering and document that for
                # predictability/determinism, or (b) implement a more
                # sophisticated scheduler (e.g. some variant of fair queueing),
                # for better behavior under load. For now, this is the worst of
                # both worlds - but it keeps our options open. (If we do decide to
                # go all in on deterministic scheduling, then there are other
                # things that will probably need to change too, like the deadlines
                # tie-breaker and the non-deterministic ordering of
                # task._notify_queues.)
                batch = list(runner.runq)
                runner.runq.clear()
                if _ALLOW_DETERMINISTIC_SCHEDULING:
                    # We're running under Hypothesis, and pytest-trio has patched
                    # this in to make the scheduler deterministic and avoid flaky
                    # tests. It's not worth the (small) performance cost in normal
                    # operation, since we'll shuffle the list and _r is only
                    # seeded for tests.
                    batch.sort(key=lambda t: t._counter)
                    _r.shuffle(batch)
                else:
                    # 50% chance of reversing the batch, this way each task
                    # can appear before/after any other task.
                    if _r.random() < 0.5:
                        batch.reverse()
                while batch:
                    task = batch.pop()
                    GLOBAL_RUN_CONTEXT.task = task
    
                    if "before_task_step" in runner.instruments:
                        runner.instruments.call("before_task_step", task)
    
                    next_send_fn = task._next_send_fn
                    next_send = task._next_send
                    task._next_send_fn = task._next_send = None
                    final_outcome = None
                    try:
                        # We used to unwrap the Outcome object here and send/throw
                        # its contents in directly, but it turns out that .throw()
                        # is buggy, at least before CPython 3.9:
                        #   https://bugs.python.org/issue29587
                        #   https://bugs.python.org/issue29590
                        # So now we send in the Outcome object and unwrap it on the
                        # other side.
                        msg = task.context.run(next_send_fn, next_send)
                    except StopIteration as stop_iteration:
                        final_outcome = Value(stop_iteration.value)
                    except BaseException as task_exc:
                        # Store for later, removing uninteresting top frames: 1
                        # frame we always remove, because it's this function
                        # catching it, and then in addition we remove however many
                        # more Context.run adds.
                        tb = task_exc.__traceback__.tb_next
                        for _ in range(CONTEXT_RUN_TB_FRAMES):
                            tb = tb.tb_next
                        final_outcome = Error(task_exc.with_traceback(tb))
                        # Remove local refs so that e.g. cancelled coroutine locals
                        # are not kept alive by this frame until another exception
                        # comes along.
                        del tb
    
                    if final_outcome is not None:
                        # We can't call this directly inside the except: blocks
                        # above, because then the exceptions end up attaching
                        # themselves to other exceptions as __context__ in
                        # unwanted ways.
                        runner.task_exited(task, final_outcome)
                        # final_outcome may contain a traceback ref. It's not as
                        # crucial compared to the above, but this will allow more
                        # prompt release of resources in coroutine locals.
                        final_outcome = None
                    else:
                        task._schedule_points += 1
                        if msg is CancelShieldedCheckpoint:
                            runner.reschedule(task)
                        elif type(msg) is WaitTaskRescheduled:
                            task._cancel_points += 1
                            task._abort_func = msg.abort_func
                            # KI is "outside" all cancel scopes, so check for it
                            # before checking for regular cancellation:
                            if runner.ki_pending and task is runner.main_task:
                                task._attempt_delivery_of_pending_ki()
                            task._attempt_delivery_of_any_pending_cancel()
                        elif type(msg) is PermanentlyDetachCoroutineObject:
                            # Pretend the task just exited with the given outcome
                            runner.task_exited(task, msg.final_outcome)
                        else:
                            exc = TypeError(
                                "trio.run received unrecognized yield message {!r}. "
                                "Are you trying to use a library written for some "
                                "other framework like asyncio? That won't work "
                                "without some kind of compatibility shim.".format(msg)
                            )
                            # The foreign library probably doesn't adhere to our
                            # protocol of unwrapping whatever outcome gets sent in.
                            # Instead, we'll arrange to throw `exc` in directly,
                            # which works for at least asyncio and curio.
                            runner.reschedule(task, exc)
                            task._next_send_fn = task.coro.throw
                        # prevent long-lived reference
                        # TODO: develop test for this deletion
                        del msg
    
                    if "after_task_step" in runner.instruments:
                        runner.instruments.call("after_task_step", task)
                    del GLOBAL_RUN_CONTEXT.task
                    # prevent long-lived references
                    # TODO: develop test for these deletions
                    del task, next_send, next_send_fn
    
        except GeneratorExit:
            # The run-loop generator has been garbage collected without finishing
            warnings.warn(
                RuntimeWarning(
                    "Trio guest run got abandoned without properly finishing... "
                    "weird stuff might happen"
                )
            )
        except TrioInternalError:
            raise
        except BaseException as exc:
>           raise TrioInternalError("internal error in Trio - please file a bug!") from exc
E           trio.TrioInternalError: internal error in Trio - please file a bug!

/home/michael/pytest-asyncio/.tox/py37/lib/python3.7/site-packages/trio/_core/_run.py:2258: TrioInternalError

You will need to use a Git version of pytest-asyncio to reproduce the issue (see this branch). Run tox -e py37 in that branch to reproduce the issue.

I'm currently trying to figure out whether I'm using pytest-trio incorrectly or whether pytest-trio or pytest-asyncio have to make adjustments to work together nicely. I'm also puzzled why the test only fails on Python 3.7. Do you have any ideas?

The error message means that someone else used signal.set_wakeup_fd to register a signal-delivery fd before starting trio. This is weird, because there should only be a signal-delivery fd set while an async loop is running. So... assuming asyncio is the one who registered this library, my guess is either:

  • you're trying to call trio while asyncio is running in the background. Seems unlikely?
  • at some point in the same python process, asyncio was running (even though it isn't anymore), and old versions of asyncio forget to clean up after themselves properly by unregistering their wakeup fd when they're done.

If it is an asyncio bug, I'm not sure what to do. Python 3.7 asyncio isn't going to get fixed. Maybe there's some workaround you can do, like forcibly calling signal.set_wakeup_fd(-1) to clear out the stale state? Or just tell people they should upgrade to 3.8+?

Thanks for the clarification!

I noticed that I always ran that test as part of our test suite. I then tried running it stand-alone and it completes successfully. I assume the issue is caused by our test setup and we have to figure that out on pytest-asyncio side.

It's unclear to me why the error only appears with Python 3.7, though.

Thanks!

It's unclear to me why the error only appears with Python 3.7, though.

My (unconfirmed) guess is that it's because in 3.7, asyncio doesn't cleanup the set_signal_fd when the loop exits, and then they fixed that in 3.8.

Thanks @njsmith! I think the fix was python/cpython#11135

Thanks for your support!

From what I can tell, the problem was related to the use of asyncio.subprocess.create_subprocess_exec in our test suite. The underlying implementation changed quite a bit from CPython 3.7 to 3.8, which is probably why recent Python versions behave differently.

Btw: We just released pytest-asyncio-0.18.3 which fixes compatibility with pytest-trio :)