implementation of asyncio.wait waiting for iterator is not complete
005 opened this issue · 3 comments
asyncio.wait has 3 different options to run with
Constant Description
FIRST_COMPLETED The function will return when any future finishes or is cancelled.
FIRST_EXCEPTION The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED.
ALL_COMPLETED The function will return when all futures finish or are cancelled.
but there is no way to run it in 4-th mode FIRST_COMPLETED + FIRST_EXCEPTION
which is "must-have" in the case when it suppose to wait for iterator
it will send totally legit Exception about end of iteration
but asyncio.wait has no way to catch/handle it properly.
Actually, FIRST_COMPLETED
also returns if any future/coroutine raises an exception. In general, a future is considered completed when it gets its result or when an exception is raised. The following example illustrates the use case you describe:
import asyncio
async def task1():
await asyncio.sleep(3)
raise RuntimeError
async def task2():
await asyncio.sleep(10)
async def main():
tasks = [task1(), task2()]
done, pending = await asyncio.wait(tasks, return_when='FIRST_COMPLETED')
exception = done.pop().exception()
assert isinstance(exception, RuntimeError)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
@005 I'd be interested in the actual code that caused this suggestion.
it looks its not asyncio, but something esle.
here is the simplest example i could come up with.
notice there are 2 urls that server will handle
/1
and
/2
open each one in different browser tab, and then there will be a difference
when you close /2 - server can see it and will print "websocket connection closed"
but if you close tab /1 it will still be stuck in "enter wait.." and will never get to the line "websocket connection closed"
import asyncio
import aiohttp
from aiohttp import web
page = """<!DOCTYPE html>
<html lang="en">
<head>
<title></title>
</head>
<body>
<script>
var url = window.location.href.replace('http','ws');
console.log('connect to', url);
var ws = new WebSocket(url);
ws.onopen = function () {
console.log('send to server');
ws.send("Hello Web Sockets!");
}
ws.onmessage = function (msg) {
console.log('got msg',msg.data);
if (msg.data == 'disconnect') {
ws.close();
} else {
ws.send(msg.data);
}
}
setInterval(function(){
console.log('i am here');
ws.send('i am here');
},1000);
//ws.close();
</script>
<button >
</body>
</html>"""
async def index(request):
ws = web.WebSocketResponse()
yes_we_can, protocol = ws.can_prepare(request)
print('yes_we_can, protocol ', yes_we_can, protocol)
if yes_we_can:
await ws.prepare(request)
ws.send_str('hello:1')
session_id = await ws.receive()
print('got session id', session_id)
while True:
print('enter wait..')
done, pending = await asyncio.wait([ws.receive()], return_when=asyncio.FIRST_COMPLETED)
print('wait done', done, pending)
msg = done.pop().result()
print('incoming msg', msg.data)
if hasattr(msg, 'type') and msg.type == aiohttp.WSMsgType.CLOSE:
break
continue
print('websocket connection closed')
return ws
else:
return web.Response(text=page, content_type='text/html')
async def index2(request):
ws = web.WebSocketResponse()
yes_we_can, protocol = ws.can_prepare(request)
print('yes_we_can, protocol ', yes_we_can, protocol)
if yes_we_can:
await ws.prepare(request)
ws.send_str('hello:1')
session_id = await ws.receive()
print('got session id', session_id)
async for msg in ws:
print('incoming msg', msg)
print('websocket connection closed')
return ws
else:
return web.Response(text=page, content_type='text/html')
app = web.Application()
app.router.add_route('GET', '/1', index)
app.router.add_route('GET', '/2', index2)
loop = asyncio.get_event_loop()
loop.set_debug(True)
handler = app.make_handler()
f = loop.create_server(handler, '0.0.0.0', 8444)
srv = loop.run_until_complete(f)
print('serving on', srv.sockets[0].getsockname())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
loop.run_until_complete(handler.finish_connections(1.0))
srv.close()
loop.run_until_complete(srv.wait_closed())
loop.run_until_complete(app.cleanup())
loop.close()
print('CLEAN EXIT')