erdewit/nest_asyncio

Re-entrant call waits until extra events are processed, could return sooner

allComputableThings opened this issue · 4 comments

loop.run_until_complete(task1)
 ...  loop.run_until_complete(task11)
loop.run_until_complete(task2)
 ...   loop.run_until_complete(task12)
loop.run_until_complete(task3)
 ...   loop.run_until_complete(task13)
loop.run_until_complete(task4)
 ...   loop.run_until_complete(task14)

# None of the nested tasks complete until the slowest (last) nested task completes

In the example below have a nested call asking to run a single task on loop.
loop has other tasks queue or in execution.
Some of the nested tasks task two seconds, some take one.
Ideally, 1 seconds tasks would complete after 1 second.

It seems the duration of:

loop.run_until_complete(nested_task)

depends heavily on what is in the queue, not how much time it spent blocking. It would be much better if it returned immediately after executing nested_task.

The program otherwise looks fully asynchronous.

import asyncio
import time
from random import randint
import nest_asyncio
nest_asyncio.apply()

def sync2async(f, *args, **kw):
    async def _f():
        return f(*args, **kw)
    return _f()

def sync(f, loop, args=(), kw={}):
    """
    Execute a non-async function in the event loop
    Generally, the function should not be IO bound unless
    it also submits the IO task to the loop.
    """
    return loop.run_until_complete(sync2async(f, *args, **kw))


async def async_db_execution(query_param, sleep=1):
    await asyncio.sleep(sleep)
    dbresult = randint(0,100)
    return query_param

def original_sync_implementation(query_param, sleep):
    # .... non-blocking sync stuff

    # async call to DB -- this thread does not block!
    # But, whomp, whomp, blocks until ALL tasks in the loop are complete,
    # not just the one submitted.
    tstart = time.time()
    result = loop.run_until_complete(async_db_execution(query_param=query_param, sleep=sleep))
    print("DB task complete should take {}, but took {}".format(sleep, time.time()-tstart))
    # Could also replace this with threaded execution!
    # loop.run_in_executor()

    # .... more non-blocking sync stuff
    return result

async def async_rest_endpoint(query_param=1, sleep=2):
    # Simulate slow IO-bound task
    # Has non-syncio markup implementation
    # but DB access is with asyncio
    print(f"...START long_running_async_task [{query_param} {sleep}]")
    result =sync(original_sync_implementation,
                 kw=dict(query_param=query_param, sleep=sleep),
                 loop=loop)
    print(f"...END   long_running_async_task [{query_param} {sleep}]")
    return result


loop = asyncio.get_event_loop()
task = asyncio.gather(*[
    loop.create_task(async_rest_endpoint(query_param=i, sleep=1+(i%2)))
    # loop.create_task(long_running_async_task())
    for i in range(10)
])
loop.run_until_complete(task)

print("DONE")
...START long_running_async_task [0 1]
...START long_running_async_task [1 2]
...START long_running_async_task [2 1]
...START long_running_async_task [3 2]
...START long_running_async_task [4 1]
...START long_running_async_task [5 2]
...START long_running_async_task [6 1]
...START long_running_async_task [7 2]
...START long_running_async_task [8 1]
...START long_running_async_task [9 2]
DB task complete after 2, should be 2.002359390258789
DB task complete after 1, should be 2.002471685409546
DB task complete after 2, should be 2.002546548843384
DB task complete after 1, should be 2.0026204586029053
DB task complete after 2, should be 2.0026872158050537
DB task complete after 1, should be 2.002741575241089
DB task complete after 2, should be 2.002797842025757
DB task complete after 1, should be 2.0028557777404785
DB task complete after 2, should be 2.0029075145721436
DB task complete after 1, should be 2.0029778480529785
...END   long_running_async_task [9 2]
...END   long_running_async_task [8 1]
...END   long_running_async_task [7 2]
...END   long_running_async_task [6 1]
...END   long_running_async_task [5 2]
...END   long_running_async_task [4 1]
...END   long_running_async_task [3 2]
...END   long_running_async_task [2 1]
...END   long_running_async_task [1 2]
...END   long_running_async_task [0 1]
DONE   # ~ 2 seconds

What I imagine is happening:

def run_all(loop)
      while True: 
           try: 
                t = loop.queue.next_signalled_unblocked()
                t.execute_until_yield_or_return()
                if t.complete: 
                     loop.queue.remove(t)
                     return t.result
                else: continue    
           except Empty:
                return

def run_until_complete(loop, task)
      queue.push(task)
      run_all(loop)
      return task.result

How it could be better:

def run_until_complete(loop, task)
      loop.queue.push(task)
      while True: 
           try: 
                t = loop.queue.next_signalled_unblocked()
                t.execute_until_yield_or_return()
                if t.complete: 
                     loop.queue.remove(t)
                    return t.result
                else: continue  # run something else
             
           except Empty:
                return

NB. in the case:

def run_until_complete(loop, task)
      loop.queue.push(task)
      while True: 
           try: 
                t = loop.queue.next_signalled_unblocked()
                t.execute_until_yield_or_return()
                if t.complete: 
                     loop.queue.remove(t)
                    return t.result
                else: continue  # run something else
             
           except Empty:
                return

... it may well be that a two-second task, t, begins execution. However, because it awaits, and return execution to the loop, this model would guarantee all 1 second tasks complete before all the 2 second ones.

When a slow task and a fast task are started at the same time, and you wait on the result of the slow task first, then fast task will have already finished with its result directly available. This is normal and to be expected.

If you wait on the fast task first then its result is available when the slow task is still working.

So it's a matter of the order of how tasks are scheduled. If the ordering of the tasks in the example is changed, for example by using this as a master task:

task = asyncio.gather(*[
    async_rest_endpoint(query_param=i, sleep=1 - i/10)
    for i in range(10)])

then the output becomes

DB task complete should take 0.09999999999999998, but took 0.10055088996887207
DB task complete should take 0.19999999999999996, but took 0.2010180950164795
DB task complete should take 0.30000000000000004, but took 0.30156946182250977
DB task complete should take 0.4, but took 0.4011995792388916
DB task complete should take 0.5, but took 0.5005972385406494
DB task complete should take 0.6, but took 0.601015567779541
DB task complete should take 0.7, but took 0.7013592720031738
DB task complete should take 0.8, but took 0.8006393909454346
DB task complete should take 0.9, but took 0.9008681774139404
DB task complete should take 1.0, but took 1.001107931137085

It first waits on the deepest nested task (which now happens to be the fastest), and then bubbles up until the slowest task.

How it could be better:

def run_until_complete(loop, task)
      loop.queue.push(task)
      while True: 
           try: 
                t = loop.queue.next_signalled_unblocked()
                t.execute_until_yield_or_return()
                if t.complete: 
                    loop.queue.remove(t)
                    return t.result

What you're forgetting here is that run_until_complete must return the result of the task that it is given, not the result of some other task that just so happens to have finished earlier.

What you're forgetting here is that run_until_complete must return the result of the task that it is given, not the result of some other task that just so happens to have finished earlier.

You're right. Should have been:

def run_until_complete(loop, task)
      loop.queue.push(task)
      while True: 
           try: 
                t = loop.queue.next_signalled_unblocked()
                t.execute_until_yield_or_return()
                if t.complete: 
                    if t is task:                            <<< added
                       loop.queue.remove(t)  # Done with this task in all contexts
                       return t.result
                   else:
                       # Another call to run_until_complete must return
                       # next_signalled_unblocked must return t a second time in that context
                       pass 

           except Empty:
                return         <<< Seems like a bug to get here

So, same question.
Why can't this return after 1 second, instead of 2?

It really seems like should be possible, since the whole process in mostly spent waiting on IO. That 2 second tasks were
scheduled before some 1 seconds ones doesn't seem important since the 2 second tasks have yielded.

task = asyncio.gather(*[
    async_rest_endpoint(query_param=i, sleep=1 - i/10)
    for i in range(10)])

... of course cannot return faster than the slowest task. But the nested tasks shouldn't be required to run as slowly as other scheduled nested (peer) tasks - and in fact could be independent of the runtime of other peers in an I/O waiting situation.

Instead of pseudo code, could you use the real code. The relevant part is just a couple dozen lines and consists of run_until_complete and _run_once.