strange hanging behavior with `nest_asyncio` when used in combination with `concurrent.futures `ProcessPoolExecutor`
dotsdl opened this issue · 3 comments
I am working on a user-facing Python client to a RESTful API (openforcefield/alchemiscale#178), and for the kinds of objects the client must pull down from the server the operation quickly becomes CPU-bound due to deserialization and client-side processing. To make efficient use of resources, we are making use of a ProcessPoolExecutor
from concurrent.futures
to distribute related groups of requests across multiple processes, and within each process we are performing the requests themselves concurrently using asyncio
.
This all seems to work really well, except there appears to be an odd interaction with nest_asyncio
that causes the ProcessPoolExecutor
to hang under certain, repeatable conditions. I've included an MWE below to illustrate, using two different async
HTTP client libraries (httpx
and aiohttp
):
# mwe.py
import asyncio
from typing import List
import nest_asyncio
import httpx
import aiohttp
nest_asyncio.apply()
def get_urls_httpx(urls: List[str]):
async def async_request():
session = httpx.AsyncClient()
try:
coros = [_get_url_httpx(url, session) for url in urls]
result = await asyncio.gather(*coros)
finally:
await session.aclose()
return result
return asyncio.run(async_request())
async def _get_url_httpx(url, session):
resp = await session.get(url, timeout=None)
return resp.status_code
def get_urls_aiohttp(urls: List[str]):
async def async_request():
async with aiohttp.ClientSession() as session:
coros = [_get_url_aiohttp(url, session) for url in urls]
result = await asyncio.gather(*coros)
return result
return asyncio.run(async_request())
async def _get_url_aiohttp(url, session):
async with session.get(url) as resp:
return resp.status
def get_all_urls(all_urls: List[List[str]], func):
from concurrent.futures import ProcessPoolExecutor, as_completed
with ProcessPoolExecutor() as executor:
futures = [executor.submit(func, urls) for urls in all_urls]
print("waiting on futures")
results = []
for future in as_completed(futures):
print("future", future)
result = future.result()
results.append(result)
return results
The get_all_urls
function is effectively what we're doing to handle groups of requests. The following illustrates the problem:
>>> import mwe
# both of these work as expected
>>> mwe.get_all_urls([['https://google.com', 'https://archlinux.org']], mwe.get_urls_httpx)
waiting on futures
future <Future at 0x7fdde2baf990 state=finished returned list>
[[301, 200]]
>>> mwe.get_all_urls([['https://google.com', 'https://archlinux.org']], mwe.get_urls_aiohttp)
waiting on futures
future <Future at 0x7fdde2baf4d0 state=finished returned list>
[[200, 200]]
# if we call these directly, they also work
>>> mwe.get_urls_httpx(['https://google.com', 'https://archlinux.org'])
[301, 200]
>>> mwe.get_urls_aiohttp(['https://google.com', 'https://archlinux.org'])
[200, 200]
# but if we repeat either of the first two function calls, they will hang
>>> mwe.get_all_urls([['https://google.com', 'https://archlinux.org']], mwe.get_urls_aiohttp)
waiting on futures
The hanging above does not happen in the absence of nest_asyncio.apply()
. However, since many of our users use Jupyter notebooks to do their work with the HTTP client I'm working on, this appears to be the only way currently to get calls to asyncio.run
to work from within a Jupyter IPython kernel.
Any ideas as to what may be happening here? Is there some kind of strange state being left behind due to nest_asyncio.apply
and the use of asyncio.run
in the main process that's impacting use of a ProcessPoolExecutor
?
Not sure where the strange behavior stems from. But generally one can expect the strangest things with ProcessPool with the Unix default of process forking. It will fork the global state into the child processes. When using the spawn method for child processes, the problem becomes serialization of functions as used by the task.
I would suggest to use something else, for example the vastly superior (ahem) distex Pool. This simplifies the code and it becomes possible to run tasks on any other machine that happens to run a SSH server:
import asyncio
from typing import List
import distex
import nest_asyncio
nest_asyncio.apply()
async def get_urls_httpx(urls):
import httpx
import asyncio
async def _get_url_httpx(url, session):
resp = await session.get(url, timeout=None)
return resp.status_code
session = httpx.AsyncClient()
try:
coros = [_get_url_httpx(url, session) for url in urls]
result = await asyncio.gather(*coros)
finally:
await session.aclose()
return result
async def get_urls_aiohttp(urls: List[str]):
import asyncio
import aiohttp
async def _get_url_aiohttp(url, session):
async with session.get(url) as resp:
return resp.status
async with aiohttp.ClientSession() as session:
coros = [_get_url_aiohttp(url, session) for url in urls]
result = await asyncio.gather(*coros)
return result
def get_all_urls(all_urls: List[List[str]], func):
with distex.Pool() as pool:
results = list(pool.map(func, all_urls, ordered=False))
return results
Thanks for this @erdewit! I'll consider alternative approaches, in particular non-forking approaches for the process pool.
For the record: switching get_all_urls
above to force use of 'spawn' as multiprocessing context doesn't show the problem this issue raises:
def get_all_urls(all_urls: List[List[str]], func):
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
ctx = mp.get_context('spawn')
with ProcessPoolExecutor(mp_context=ctx) as executor:
futures = [executor.submit(func, urls) for urls in all_urls]
print("waiting on futures")
results = []
for future in as_completed(futures):
print("future", future)
result = future.result()
results.append(result)
return results