erdewit/nest_asyncio

ThreadPoolExecutor issue

douglas-raillard-arm opened this issue · 8 comments

On my way to debug another issue, I ended up with this snippet that has a problem on Python 3.8 with nest-asyncio==1.5.5:

import nest_asyncio
nest_asyncio.apply()

from concurrent.futures import ThreadPoolExecutor

import asyncio

def main():

    def f():
        with ThreadPoolExecutor() as pool:
            pool.map(g, range(10))

    def g(x):
        asyncio.run(h(x))

    async def h(x):
        print(x)

    f()

main()

Half of the time, it will give that output:

0
1
2
3
4
5
8
7
6
9
Exception ignored in: <function BaseEventLoop.__del__ at 0x7f71a2461280>
Traceback (most recent call last):
  File "/usr/lib/python3.8/asyncio/base_events.py", line 656, in __del__
    self.close()
  File "/usr/lib/python3.8/asyncio/unix_events.py", line 58, in close
    super().close()
  File "/usr/lib/python3.8/asyncio/selector_events.py", line 92, in close
    self._close_self_pipe()
  File "/usr/lib/python3.8/asyncio/selector_events.py", line 99, in _close_self_pipe
    self._remove_reader(self._ssock.fileno())
  File "/usr/lib/python3.8/asyncio/selector_events.py", line 276, in _remove_reader
    key = self._selector.get_key(fd)
  File "/usr/lib/python3.8/selectors.py", line 190, in get_key
    return mapping[fileobj]
  File "/usr/lib/python3.8/selectors.py", line 71, in __getitem__
    fd = self._selector._fileobj_lookup(fileobj)
  File "/usr/lib/python3.8/selectors.py", line 225, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
  File "/usr/lib/python3.8/selectors.py", line 42, in _fileobj_to_fd
    raise ValueError("Invalid file descriptor: {}".format(fd))
ValueError: Invalid file descriptor: -1

The other half of the time, everything goes fine.

Note that I could not reproduce that on Python 3.9, and I have not tested it on 3.10.

I suppose the asyncio event loop are not thread safe ? That would make sense given that there is supposed to be only one per process in the base asyncio package.

EDIT: simplified the reproducer a little bit

Actually it is known for sure the asyncio's event loop is not thread safe.

However it would make sense for the patched asyncio.run() to actually be thread safe by using asyncio.run_coroutine_threadsafe() if possible, to make it look like asyncio.run() is thread safe.

If that is not the case nest_asyncio become quite more difficult to use as a way to add an async API to an existing library without breaking compatibility such as devlib

  • existing code exposed a thread safe API
  • The code is rewritten in async style to enable easy concurrency with a good API.
  • Blocking stubs are still provided for all converted function. This is done automatically using a decorator in this way:
@asyncf
async def foo(..):
   ...


# Call the coroutine function directly
foo.asyn() 

# blocking call, for backward compat and simple uses.
# It's equivalent to asyncio.run(foo.asyn())
foo() 

That means that arbitrary code can now call asyncio.run() in order to provide a backward-compatible blocking stub. These calls might very well happen from user code from multiple threads.

asyncio.run is not thread-safe and the patched version isn't either. Your example code is therefore also not thread-safe.

The actual error is from closing the "self pipe" in the garbage collection of the event loop. I remember that this was a randomly occuring bug in Python, dependent on the order of how things are cleaned up.

As an alternative, it's possible to give the threads of the threadpool each their own event loop (via the initalizer of the pool) and then run the tasks in the thread's own loop with loop.run_until_complete. Perhaps that covers your use case.

I'll see what I can do with a per-thread event loop thanks. The story around transitioning to async without breaking backward compat is really awful in Python. It feels like it was not even an afterthought but not thought at all, even years after the introduction of async ...

@erdewit Actually I'm a bit confused. The doc states:

This function always creates a new event loop and closes it at the end. It should be used as a main entry point for asyncio programs, and should ideally only be called once.
https://docs.python.org/3/library/asyncio-task.html#asyncio.run

While it does not specify it's thread safe, it clearly states that a new event loop is created, so I'm not sure rolling my own equivalent of asyncio.run() would fix anything.
Users typically create their own threads, how could I create a new loop in that case ?
Alternatively I could probably just setup a loop for the main thread in a global var and then call asyncio.run_coroutine_threadsafe() from the other threads, but that would not scale as well.

I remember that this was a randomly occuring bug in Python, dependent on the order of how things are cleaned up.
Is it something that only shows up in multithreaded code ? Python 3.9 seems to not exhibit the problem though so I can live with that.

While it does not specify it's thread safe, it clearly states that a new event loop is created

You're right, asyncio.run should be thread-safe if the tasks are thread-safe too. For the nested version it is not possible to create a new loop for every run, since there can be multiple runs concurrently in the same thread and they all have to use the same event loop.

Alternatively I could probably just setup a loop for the main thread in a global var and then call asyncio.run_coroutine_threadsafe()

It would defeat the purpose of using a thread pool, which supposedly is to run CPU intensive code that can release the GIL. If it is run in the main thread it will block the main event loop.

As long as asyncio.run() is thread-safe I think I'm good since the lib itself is already threadsafe.

It would defeat the purpose of using a thread pool, which supposedly is to run CPU intensive code that can release the GIL. If it is run in the main thread it will block the main event loop.

Yes in absolute terms, not so much in practice in my case. The library in question abstracts over adb/ssh/localhost shell interactions. The reason it allowed multithreading was to allow use cases similar to what async nowadays provides.

The serial part of the code is pretty cheap compared to the command executions themselves (which can take >50ms). The management of the command execution is done in a separate thread pool, so the event loop itself would not have much to do beyond dispatching the commands. The code using the output of the said commands is very cheap, so even if a lot of multithread users end up scheduling coroutines on the same loop it should not be too horrible.

That said, I just hit an issue while prototyping that: asyncio.run_coroutine_threadsafe() returns a concurrent.futures.Future. If the loop.run_forever() happens in another thread, future.result() hangs and so does the event loop for some reason. Simply waiting with time.sleep() shows that without the .result() call, the event loop proceeds to execute everything as expected. I have not yet found anyone else complaining about that (and it's not related to nest_asyncio):

import atexit
import threading
import asyncio

# import nest_asyncio
# nest_asyncio.apply()

_LOOP = asyncio.new_event_loop()
def _loop_threadf():
    loop = _LOOP
    try:
        loop.run_forever()
    finally:
        loop.close()

def _loop_stop():
    _LOOP.call_soon_threadsafe(_LOOP.stop)
    # Since the thread is a daemon thread, we need to join() it here otherwise
    # the main thread will keep going and then abruptly kill the daemon thread
    # at some point.
    try:
        _LOOP_THREAD.join()
    # If the thread did not start already, we will get a RuntimeError
    except RuntimeError:
        pass


# We need to use a daemon thread, otherwise the atexit handler will not run for
# some reason.
_LOOP_THREAD = threading.Thread(
    target=_loop_threadf,
    name='devlib event loop',
    daemon=True
)
atexit.register(_loop_stop)
_LOOP_THREAD.start()



def run(coro):
    future = asyncio.run_coroutine_threadsafe(coro, _LOOP)
    # Uncomment that and corof12 is never printed, which indicates the event
    # loop froze
    #
    # return future.result()

async def corof2():
    print('corof2')

async def corof1():
    print('corof1')
    run(corof2())
    # await corof2()

run(corof1())

import time
time.sleep(2)
def run(coro):
    future = asyncio.run_coroutine_threadsafe(coro, _LOOP)
    return future.result()

This will only work if _LOOP is running in a different thread. It will deadlock if in the same thread, as it will get stuck at the last line and and the task can't make any progress since the event loop is blocked.

It will deadlock if in the same thread, as it will get stuck at the last line and and the task can't make any progress since the event loop is blocked.

The loop is supposed to make progress with the run_forever() call, done in a separate thread. Also it does "work" as long you never call future.result(), which is quite strange (you can see the prints proving that the coroutine is actually being executed).

EDIT: I've been staring at that stuff for too long, it's actually quite obvious that it deadlocks indeed. The nested run() obviously does not yield back to the event loop since there is no await, and the whole thing deadlocks.