python/asyncio

implementation of asyncio.wait waiting for iterator is not complete

005 opened this issue · 3 comments

005 commented

asyncio.wait has 3 different options to run with

Constant Description
FIRST_COMPLETED The function will return when any future finishes or is cancelled.
FIRST_EXCEPTION The function will return when any future finishes by raising an exception. If no future raises an exception then it is equivalent to ALL_COMPLETED.
ALL_COMPLETED The function will return when all futures finish or are cancelled.

but there is no way to run it in 4-th mode FIRST_COMPLETED + FIRST_EXCEPTION
which is "must-have" in the case when it suppose to wait for iterator
it will send totally legit Exception about end of iteration
but asyncio.wait has no way to catch/handle it properly.

Actually, FIRST_COMPLETED also returns if any future/coroutine raises an exception. In general, a future is considered completed when it gets its result or when an exception is raised. The following example illustrates the use case you describe:

import asyncio

async def task1():
    await asyncio.sleep(3)
    raise RuntimeError

async def task2():
    await asyncio.sleep(10)

async def main():
    tasks = [task1(), task2()]
    done, pending = await asyncio.wait(tasks, return_when='FIRST_COMPLETED')
    exception = done.pop().exception()
    assert isinstance(exception, RuntimeError)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

@005 I'd be interested in the actual code that caused this suggestion.

005 commented

it looks its not asyncio, but something esle.

here is the simplest example i could come up with.

notice there are 2 urls that server will handle

/1
and
/2

open each one in different browser tab, and then there will be a difference
when you close /2 - server can see it and will print "websocket connection closed"
but if you close tab /1 it will still be stuck in "enter wait.." and will never get to the line "websocket connection closed"

import asyncio
import aiohttp
from aiohttp import web

page = """<!DOCTYPE html>
        <html lang="en">
            <head>
                <title></title>
            </head>
            <body>
                <script>
                    var url = window.location.href.replace('http','ws');
                    console.log('connect to', url);
                    var ws = new WebSocket(url);
                    ws.onopen = function () {
                        console.log('send to server');
                        ws.send("Hello Web Sockets!");
                    }
                    ws.onmessage = function (msg) {
                        console.log('got msg',msg.data);
                        if (msg.data == 'disconnect') {
                            ws.close();
                        } else {
                            ws.send(msg.data);
                        }
                    }
                    setInterval(function(){
                        console.log('i am here');
                        ws.send('i am here');
                    },1000);
                    //ws.close();
                </script>
                <button >
            </body>
        </html>"""



async def index(request):
    ws = web.WebSocketResponse()
    yes_we_can, protocol = ws.can_prepare(request)
    print('yes_we_can, protocol ', yes_we_can, protocol)
    if yes_we_can:
        await ws.prepare(request)
        ws.send_str('hello:1')
        session_id = await ws.receive()
        print('got session id', session_id)
        while True:
            print('enter wait..')
            done, pending = await asyncio.wait([ws.receive()], return_when=asyncio.FIRST_COMPLETED)
            print('wait done', done, pending)
            msg = done.pop().result()
            print('incoming msg', msg.data)
            if hasattr(msg, 'type') and msg.type == aiohttp.WSMsgType.CLOSE:
                break
            continue
        print('websocket connection closed')
        return ws
    else:
        return web.Response(text=page, content_type='text/html')


async def index2(request):
    ws = web.WebSocketResponse()
    yes_we_can, protocol = ws.can_prepare(request)
    print('yes_we_can, protocol ', yes_we_can, protocol)
    if yes_we_can:
        await ws.prepare(request)
        ws.send_str('hello:1')
        session_id = await ws.receive()
        print('got session id', session_id)
        async for msg in ws:
            print('incoming msg', msg)

        print('websocket connection closed')
        return ws
    else:
        return web.Response(text=page, content_type='text/html')


app = web.Application()
app.router.add_route('GET', '/1', index)
app.router.add_route('GET', '/2', index2)
loop = asyncio.get_event_loop()
loop.set_debug(True)
handler = app.make_handler()
f = loop.create_server(handler, '0.0.0.0', 8444)
srv = loop.run_until_complete(f)
print('serving on', srv.sockets[0].getsockname())
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    loop.run_until_complete(handler.finish_connections(1.0))
    srv.close()
    loop.run_until_complete(srv.wait_closed())
    loop.run_until_complete(app.cleanup())
loop.close()
print('CLEAN EXIT')