Tribler/py-ipv8

Get rid of `get_event_loop()` calls

qstokkink opened this issue · 19 comments

EDIT: This thread is quite long, click here for the tl;dr for PR authors

Right now, the IPv8 code base uses the following Python 3.6 way of launching asynchronous functions:

asyncio.ensure_future(some_async_function())
asyncio.get_event_loop().run_forever()

However, since we have dropped support for Python 3.6, we can now use "modern" Python 3.7 syntax:

asyncio.run(some_async_function())

The PR to resolve this issue should probably be focused around removing get_event_loop() from our code base entirely. According to the documentation:

Because this function has rather complex behavior (especially when custom event loop policies are in use), using the get_running_loop() function is preferred to get_event_loop() in coroutines and callbacks.

As noted above, consider using the higher-level asyncio.run() function, instead of using these lower level functions to manually create and close an event loop.

In other words: everywhere where asyncio.run does not obsolete a call to fetch the event loop, get_running_loop() should be used. Besides that, get_event_loop() has been deprecated since Python 3.10.

Observation: any call to get_running_loop().stop() (a very common pattern in our code to stop experiments) causes a RuntimeError: Event loop stopped before Future completed. One fix for this is to simply wrap asyncio.run() in a try: ... except RuntimeError: ... but this is pretty ugly.

Here's the minimal example:

from asyncio import get_running_loop, run, ensure_future


async def foo():
    pass


async def bar():
    get_running_loop().stop()  # OPTION 1: Change this into another method for stopping ..


async def main():
    fut = ensure_future(foo())
    await bar()
    await fut

run(main())  # OPTION 2: .. or, change this into another method for running.

The exercise is this: you're only allowed to change the manner of invocation (currently asyncio.run) or the manner of stopping the loop (currently get_running_loop().stop()). You're not allowed to change the other lines. Your goal is to avoid a RuntimeError.

We're trying to avoid this solution:

try:
    run(main())  # OPTION 2: .. or, change this into another method for running.
except RuntimeError:
    pass

I found one workaround that I still consider "dirty" but "less dirty" than the except RuntimeError: solution:

async def bar():
    sys.exit(0)  # OPTION 1: Change this into another method for stopping ..

The downside of this approach is that any code after asyncio.run() is not executed.


EDIT: In that line of thinking, the following crashes just the event loop and allows code execution after asyncio.run():

get_running_loop().call_exception_handler({})   # OPTION 1: Change this into another method for stopping ..

However, this produces a nasty error line:

Unhandled exception in event loop

EDIT2: The following is an extension of the previous and stops the event loop as usual. HOWEVER, when used in example code, this probably mystifies the reader.

get_running_loop().call_exception_handler({"message": "\r"})

We CAN hide this call behind an import and simply define this in util.py:

## util.py

def stop_running_loop():
    get_running_loop().call_exception_handler({"message": "\r"})

The minimal example would then be transformed as follows:

from asyncio import get_running_loop, run, ensure_future

from ipv8.util import stop_running_loop


async def foo():
    pass


async def bar():
    stop_running_loop()  # OPTION 1: Change this into another method for stopping ..


async def main():
    fut = ensure_future(foo())
    await bar()
    await fut

run(main())  # OPTION 2: .. or, change this into another method for running.

In my opinion, instead of just calling get_running_loop().stop() from the bar coroutine, it is better to cancel the main coroutine, as well as other coroutines. This way, all finalization logic in coroutines can work properly.

The problem is the asyncio run function does not provide a way to cancel the main coroutine. To fix it, I suggest using a small helper class:

from asyncio import CancelledError, Future, run, ensure_future
from typing import Optional

class Runner:
    def __init__(self, coro):
        self.coro = coro
        self.future: Optional[Future] = None

    def run(self):
        try:
            run(self.actual_coro())
        except CancelledError:
            pass

    async def actual_coro(self):
        self.future = ensure_future(self.coro)
        await self.future

    def cancel(self):
        if self.future is not None:
            self.future.cancel()

With this class, we can write the code above in the following way:

async def foo():
    pass


async def bar():
    runner.cancel()


async def main():
    fut = ensure_future(foo())
    await bar()
    await fut


runner = Runner(main())
runner.run()

And it should finish correctly and without errors.

It is also possible to hide the instance of the Runner class as implementation details and provide two utility functions, run_in_new_loop and cancel_loop, and call the cancel_loop where appropriate

@kozlovsky thanks for the suggestion. 🙏 I played around with your implementation for a bit. Right now, I still consider the stop_running_loop() approach (above) superior for the following two reasons:

  1. The try: .. except RuntimeError: approach suppresses all of the RuntimeError instances that escape asyncio.run, this is not great. The Runner.run() approach above instead suppresses all of the CancelledError instances. An example for each of these respectively would be the following async def foo() implementations:
async def foo():
    raise RuntimeError("Unexpected! Here is some critical debug info: ...")

async def foo():
    raise CancelledError("Unexpected! Here is some critical debug info: ...")

I believe an approach that does not suppress any exception is preferable.

  1. Both the run call and the stop call require users to call our custom construction (the stop_running_loop() solution only requires a custom stop).

As an alternative to cancelling the main task, you could also just let the main function finish normally. Of course, you need some kind of replacement for run_forever(), so I created one using asyncio.Event here.

I'm not using the code from the exercise, since I feel it's doing things out of order.

<asyncio rant>

For me, it looks like the big design flaw of asyncio in that it uses CancelledError for two different purposes:

  • to show that the coroutine foo itself was canceled.
  • to show that another coroutine bar that coroutine foo awaits was canceled.

For that reason, it is impossible to write the correct code this way:

async def foo():
    try:
        await bar()
    except CancelledError:
        print("The `bar` coroutine was canceled, let's continue to run the foo coroutine")
    ...

Because it is possible that it is not the bar but the foo coroutine itself that was canceled, and suppressing CancelledError accidentally suppresses this cancellation of the foo coroutine.

That significantly complicates handling the CancelledError, in my opinion. To distinguish two reasons for CancelledError, it is necessary to write the code like this:

async def foo():
    bar_task = asyncio.create_task(bar())
    await asyncio.wait([bar_task])  # if CancelledError happens here, it's foo and not bar who was canceled
    if bar_task.cancelled():
        print("The `bar` coroutine was canceled, let's continue to run the foo coroutine")
    else:
        bar_result = bar_task.result()
    ...

</asyncio rant>

I created one using asyncio.Event here

I like this approach; it requires a special handling of asyncio.Event in the main coroutine, so not any coroutine can be used as a main coroutine, but the result is clear and predictable

If you want to stop the loop without any finalization of coroutines, something like this can be extracted from the asyncio code:

def run(main, *, debug=None):
    if events._get_running_loop() is not None:
        raise RuntimeError(
            "run() cannot be called from a running event loop")

    loop = events.new_event_loop()
    events.set_event_loop(loop)
    if debug is not None:
        loop.set_debug(debug)
    try:
        return loop.run_until_complete(main)
    finally:
        events.set_event_loop(None)
    # skip the cancellation of subtasks and closing the loop

But it looks a bit dangerous to skip all the finalization and cancellation of subtasks

@kozlovsky @egbertbouman thanks for the suggestions. @kozlovsky for your suggestion I still get a RuntimeError. @egbertbouman I implemented your suggestion as follows (with globals but this is just for a POC):

from asyncio import ensure_future, run, Event, CancelledError

EVENT: Event = None


async def foo():
    #raise RuntimeError("DebugInfo")  # Test 1 retains RuntimeError
    raise CancelledError("DebugInfo")  # Test 2 retains CancelledError
    pass


async def bar():
    global EVENT
    EVENT = EVENT or Event()
    EVENT.set()  # OPTION 1: Change this into another method for stopping ..


async def main():
    fut = ensure_future(foo())
    await bar()
    await fut


async def main_wrapper():
    global EVENT
    EVENT = EVENT or Event()
    ensure_future(main())
    await EVENT.wait()


run(main_wrapper())  # OPTION 2: .. or, change this into another method for running.
print("I have run. Huzzah.")  # Test 3 continues after run

This exits cleanly and retains exceptions. The only functional downside I see here is that the CancelledError is not reported (the line marked with Test 2). Functionally, that makes this equal to the Runner suggestion before.

CancelledError in asyncio is like StopIteration; the presence of it does not say the application has an error, so I think it is ok if it is not reported.

@kozlovsky I agree that this is part of normal execution flow. However, these exceptions should never leak out of the outer coroutine. If we suppress them, we might be missing a serious bug. That said, I do agree that this is only a small sacrifice to make (I don't see a CancelledError escape very often - it's mostly probably usually fine).

Considering all of the solutions posted so far, given that the stop_running_loop() preserves the functionality that we are replacing, I don't think we have to make even this small sacrifice of CancelledError suppression. It seems better to make no sacrifice than a small sacrifice.

If you port your example to the old way of doing things, you'll find exeptions are still going missing. There is a reason why I didn't use the exercise code: it just looks a bit dodgy as you don't really know when loop.stop() has completed.

However, these exceptions should never leak out of the outer coroutine. If we suppress them, we might be missing a serious bug.

I mean, with the asyncio.Event approach, we do not explicitly suppress CancelledError (we do not explicitly suppress any exception with this approach); The CancelledError exception is handled by the asyncio loop, just as the StopIteration exception is handled by the for-loop, and I think that it is ok. This is the reason I like this approach the most.

The stop_running_loop() approach feels a bit risky to me because of the unhandled finalization of running coroutines, but for quick experiments, it may be ok.

@egbertbouman Touché. The old code also eats CancelledError instances. Only RuntimeError is preserved. Tested with this code:

from asyncio import get_event_loop, ensure_future, CancelledError, sleep


async def foo():
    #raise RuntimeError("DebugInfo")  # Preserved :-)
    raise CancelledError("DebugInfo")  # Eaten :-(


async def bar():
    await sleep(0.1)
    get_event_loop().stop()


async def main():
    fut = ensure_future(foo())
    await bar()
    await fut


ensure_future(main())
get_event_loop().run_forever()

[related to @kozlovsky discussion below] I fully agree that in normal execution stopping the event loop is a bad idea. However, we use this pattern a lot in our documentation examples to quickly terminate. I'd rather use a non-invasive workaround for the documentation because I don't think anyone wants to volunteer to rewrite all of the documentation examples to cleanly exit. That said, if someone does want to take the time to make everything neat and tidy, I would absolutely support this approach. Without this heroic volunteer, I do think we still need a way to really kill the event loop.

@kozlovsky [also mentioned above] Agreed for normal operation that this should be the way that the existing code is ported inside of the main IPv8 service (ipv8_plugin.py). Also agreed on only using the stop_running_loop() for experiments (this is also where most of the "creative" use cases come from, which I'm trying not to change to avoid a big documentation rewrite).

tl;dr For PRs

Just to summarize, the plan right now is the following:

Regarding running scripts

  1. All patterns of the following form:
asyncio.ensure_future(main())
asyncio.get_event_loop().run_forever()

Should be replaced by:

asyncio.run(main())
  1. All patterns of the following form:
asyncio.ensure_future(foo())
asyncio.ensure_future(bar())
asyncio.get_event_loop().run_forever()

Should be replaced by:

async def main():
    await asyncio.gather(foo(), bar())

asyncio.run(main())

Regarding stopping scripts

We create a new function in util.py to do a quick and dirty stop of the event loop:

def stop_running_loop():
    get_running_loop().call_exception_handler({"message": "\r"})

This function may only be used in the doc/ folder*. If the use of this function is visible and necessary in the documentation, it should be communicated in its text that this is not supposed to be used in production code. Note: any shift in line count in the documentation files will also necessitate the referencing documentation .rst file to be adjusted!

For all other scripts that appear in scripts/ and stresstest/ this function should not be used and the main loop should be gracefully terminated.

*Disclaimer: ideally this function is not used at all. However, a rewrite of the docs to guarantee graceful shutdown is probably too much work for a single PR.

Regarding all other event loop access (very few cases)

All patterns of the following form, where methodX() is NOT stop() (see previous point):

asyncio.get_event_loop().methodX()
# OR
loop = asyncio.get_event_loop()
loop.methodX()

Should be replaced by:

asyncio.get_running_loop().methodX()
# OR
loop = asyncio.get_event_loop()
loop.methodX()

If you want, I'd be happy to change all scripts (including the docs) to use the suggestion I mentioned earlier.

I don't think it would be that much work, as you can just create your own run_forever (to be placed in util.py):

def run_forever():
        stopped = Event()

        signal.signal(signal.SIGINT, lambda sig, _: stopped.set())
        signal.signal(signal.SIGTERM, lambda sig, _: stopped.set())
        
        return stopped.wait()

This can then be called at the end of every main function:

async def main():
    # Do stuff here

    await run_forever()

run(main())

Admittedly, this still wouldn't shutdown IPv8 correctly, but we're not doing that now either.

@egbertbouman sounds good. 👍 If you want to take on the responsibility of solving this issue "the right way", without any dirty shortcuts (or - regarding your comment about the shutdown - at least less of them), that would be great! I'll leave this issue up to you.