Re-entrant call waits until extra events are processed, could return sooner
allComputableThings opened this issue · 4 comments
loop.run_until_complete(task1)
... loop.run_until_complete(task11)
loop.run_until_complete(task2)
... loop.run_until_complete(task12)
loop.run_until_complete(task3)
... loop.run_until_complete(task13)
loop.run_until_complete(task4)
... loop.run_until_complete(task14)
# None of the nested tasks complete until the slowest (last) nested task completes
In the example below have a nested call asking to run a single task on loop.
loop has other tasks queue or in execution.
Some of the nested tasks task two seconds, some take one.
Ideally, 1 seconds tasks would complete after 1 second.
It seems the duration of:
loop.run_until_complete(nested_task)
depends heavily on what is in the queue, not how much time it spent blocking. It would be much better if it returned immediately after executing nested_task.
The program otherwise looks fully asynchronous.
import asyncio
import time
from random import randint
import nest_asyncio
nest_asyncio.apply()
def sync2async(f, *args, **kw):
async def _f():
return f(*args, **kw)
return _f()
def sync(f, loop, args=(), kw={}):
"""
Execute a non-async function in the event loop
Generally, the function should not be IO bound unless
it also submits the IO task to the loop.
"""
return loop.run_until_complete(sync2async(f, *args, **kw))
async def async_db_execution(query_param, sleep=1):
await asyncio.sleep(sleep)
dbresult = randint(0,100)
return query_param
def original_sync_implementation(query_param, sleep):
# .... non-blocking sync stuff
# async call to DB -- this thread does not block!
# But, whomp, whomp, blocks until ALL tasks in the loop are complete,
# not just the one submitted.
tstart = time.time()
result = loop.run_until_complete(async_db_execution(query_param=query_param, sleep=sleep))
print("DB task complete should take {}, but took {}".format(sleep, time.time()-tstart))
# Could also replace this with threaded execution!
# loop.run_in_executor()
# .... more non-blocking sync stuff
return result
async def async_rest_endpoint(query_param=1, sleep=2):
# Simulate slow IO-bound task
# Has non-syncio markup implementation
# but DB access is with asyncio
print(f"...START long_running_async_task [{query_param} {sleep}]")
result =sync(original_sync_implementation,
kw=dict(query_param=query_param, sleep=sleep),
loop=loop)
print(f"...END long_running_async_task [{query_param} {sleep}]")
return result
loop = asyncio.get_event_loop()
task = asyncio.gather(*[
loop.create_task(async_rest_endpoint(query_param=i, sleep=1+(i%2)))
# loop.create_task(long_running_async_task())
for i in range(10)
])
loop.run_until_complete(task)
print("DONE")
...START long_running_async_task [0 1]
...START long_running_async_task [1 2]
...START long_running_async_task [2 1]
...START long_running_async_task [3 2]
...START long_running_async_task [4 1]
...START long_running_async_task [5 2]
...START long_running_async_task [6 1]
...START long_running_async_task [7 2]
...START long_running_async_task [8 1]
...START long_running_async_task [9 2]
DB task complete after 2, should be 2.002359390258789
DB task complete after 1, should be 2.002471685409546
DB task complete after 2, should be 2.002546548843384
DB task complete after 1, should be 2.0026204586029053
DB task complete after 2, should be 2.0026872158050537
DB task complete after 1, should be 2.002741575241089
DB task complete after 2, should be 2.002797842025757
DB task complete after 1, should be 2.0028557777404785
DB task complete after 2, should be 2.0029075145721436
DB task complete after 1, should be 2.0029778480529785
...END long_running_async_task [9 2]
...END long_running_async_task [8 1]
...END long_running_async_task [7 2]
...END long_running_async_task [6 1]
...END long_running_async_task [5 2]
...END long_running_async_task [4 1]
...END long_running_async_task [3 2]
...END long_running_async_task [2 1]
...END long_running_async_task [1 2]
...END long_running_async_task [0 1]
DONE # ~ 2 seconds
What I imagine is happening:
def run_all(loop)
while True:
try:
t = loop.queue.next_signalled_unblocked()
t.execute_until_yield_or_return()
if t.complete:
loop.queue.remove(t)
return t.result
else: continue
except Empty:
return
def run_until_complete(loop, task)
queue.push(task)
run_all(loop)
return task.result
How it could be better:
def run_until_complete(loop, task)
loop.queue.push(task)
while True:
try:
t = loop.queue.next_signalled_unblocked()
t.execute_until_yield_or_return()
if t.complete:
loop.queue.remove(t)
return t.result
else: continue # run something else
except Empty:
return
NB. in the case:
def run_until_complete(loop, task)
loop.queue.push(task)
while True:
try:
t = loop.queue.next_signalled_unblocked()
t.execute_until_yield_or_return()
if t.complete:
loop.queue.remove(t)
return t.result
else: continue # run something else
except Empty:
return
... it may well be that a two-second task, t, begins execution. However, because it awaits, and return execution to the loop, this model would guarantee all 1 second tasks complete before all the 2 second ones.
When a slow task and a fast task are started at the same time, and you wait on the result of the slow task first, then fast task will have already finished with its result directly available. This is normal and to be expected.
If you wait on the fast task first then its result is available when the slow task is still working.
So it's a matter of the order of how tasks are scheduled. If the ordering of the tasks in the example is changed, for example by using this as a master task:
task = asyncio.gather(*[
async_rest_endpoint(query_param=i, sleep=1 - i/10)
for i in range(10)])
then the output becomes
DB task complete should take 0.09999999999999998, but took 0.10055088996887207
DB task complete should take 0.19999999999999996, but took 0.2010180950164795
DB task complete should take 0.30000000000000004, but took 0.30156946182250977
DB task complete should take 0.4, but took 0.4011995792388916
DB task complete should take 0.5, but took 0.5005972385406494
DB task complete should take 0.6, but took 0.601015567779541
DB task complete should take 0.7, but took 0.7013592720031738
DB task complete should take 0.8, but took 0.8006393909454346
DB task complete should take 0.9, but took 0.9008681774139404
DB task complete should take 1.0, but took 1.001107931137085
It first waits on the deepest nested task (which now happens to be the fastest), and then bubbles up until the slowest task.
How it could be better:
def run_until_complete(loop, task) loop.queue.push(task) while True: try: t = loop.queue.next_signalled_unblocked() t.execute_until_yield_or_return() if t.complete: loop.queue.remove(t) return t.result
What you're forgetting here is that run_until_complete
must return the result of the task that it is given, not the result of some other task that just so happens to have finished earlier.
What you're forgetting here is that
run_until_complete
must return the result of the task that it is given, not the result of some other task that just so happens to have finished earlier.
You're right. Should have been:
def run_until_complete(loop, task)
loop.queue.push(task)
while True:
try:
t = loop.queue.next_signalled_unblocked()
t.execute_until_yield_or_return()
if t.complete:
if t is task: <<< added
loop.queue.remove(t) # Done with this task in all contexts
return t.result
else:
# Another call to run_until_complete must return
# next_signalled_unblocked must return t a second time in that context
pass
except Empty:
return <<< Seems like a bug to get here
So, same question.
Why can't this return after 1 second, instead of 2?
It really seems like should be possible, since the whole process in mostly spent waiting on IO. That 2 second tasks were
scheduled before some 1 seconds ones doesn't seem important since the 2 second tasks have yielded.
task = asyncio.gather(*[
async_rest_endpoint(query_param=i, sleep=1 - i/10)
for i in range(10)])
... of course cannot return faster than the slowest task. But the nested tasks shouldn't be required to run as slowly as other scheduled nested (peer) tasks - and in fact could be independent of the runtime of other peers in an I/O waiting situation.
Instead of pseudo code, could you use the real code. The relevant part is just a couple dozen lines and consists of run_until_complete and _run_once.