unsync
Unsynchronize asyncio by using an ambient event loop in a separate thread.
Rules for unsync
- Mark all async functions with
@unsync. May also mark regular functions to execute in a separate thread.- All
@unsyncfunctions, async or not, return anUnfuture
- All
- All
Futuresmust beUnfutureswhich includes the result of an@unsyncfunction call, or wrappingUnfuture(asyncio.Future)orUnfuture(concurrent.Future).Unfuturecombines the behavior ofasyncio.Futureandconcurrent.Future:Unfuture.set_valueis threadsafe unlikeasyncio.FutureUnfutureinstances can be awaited, even if made fromconcurrent.FutureUnfuture.result()is a blocking operation except inunsync.loop/unsync.threadwhere it behaves likeasyncio.Future.resultand will throw an exception if the future is not done
- Functions will execute in different contexts:
@unsyncasync functions will execute in an event loop inunsync.thread@unsyncregular functions will execute inunsync.thread_executor, aThreadPoolExecutor@unsync(cpu_bound=True)regular functions will execute inunsync.process_executor, aProcessPoolExecutor
Examples
Simple Sleep
A simple sleeping example with asyncio:
async def sync_async():
await asyncio.sleep(0.1)
return 'I hate event loops'
result = asyncio.run(sync_async())
print(result)Same example with unsync:
@unsync
async def unsync_async():
await asyncio.sleep(0.1)
return 'I like decorators'
print(unsync_async().result())Threading a synchronous function
Synchronous functions can be made to run asynchronously by executing them in a concurrent.ThreadPoolExecutor.
This can be easily accomplished by marking the regular function @unsync.
@unsync
def non_async_function(seconds):
time.sleep(seconds)
return 'Run in parallel!'
start = time.time()
tasks = [non_async_function(0.1) for _ in range(10)]
print([task.result() for task in tasks])
print('Executed in {} seconds'.format(time.time() - start))Which prints:
['Run in parallel!', 'Run in parallel!', ...]
Executed in 0.10807514190673828 seconds
Continuations
Using Unfuture.then chains asynchronous calls and returns an Unfuture that wraps both the source, and continuation.
The continuation is invoked with the source Unfuture as the first argument.
Continuations can be regular functions (which will execute synchronously), or @unsync functions.
@unsync
async def initiate(request):
await asyncio.sleep(0.1)
return request + 1
@unsync
async def process(task):
await asyncio.sleep(0.1)
return task.result() * 2
start = time.time()
print(initiate(3).then(process).result())
print('Executed in {} seconds'.format(time.time() - start))Which prints:
8
Executed in 0.20314741134643555 seconds
Mixing methods
We'll start by converting a regular synchronous function into a threaded Unfuture which will begin our request.
@unsync
def non_async_function(num):
time.sleep(0.1)
return num, num + 1We may want to refine the result in another function, so we define the following continuation.
@unsync
async def result_continuation(task):
await asyncio.sleep(0.1)
num, res = task.result()
return num, res * 2We then aggregate all the results into a single dictionary in an async function.
@unsync
async def result_processor(tasks):
output = {}
for task in tasks:
num, res = await task
output[num] = res
return outputExecuting the full chain of non_async_function→result_continuation→result_processor would look like:
start = time.time()
print(result_processor([non_async_function(i).then(result_continuation) for i in range(10)]).result())
print('Executed in {} seconds'.format(time.time() - start))Which prints:
{0: 2, 1: 4, 2: 6, 3: 8, 4: 10, 5: 12, 6: 14, 7: 16, 8: 18, 9: 20}
Executed in 0.22115683555603027 seconds
Preserving typing
As far as we know it is not possible to change the return type of a method or function using a decorator. Therefore, we need a workaround to properly use IntelliSense. You have three options in general:
-
Ignore type warnings.
-
Use a suppression statement where you reach the type warning.
A. When defining the unsynced method by changing the return type to an
Unfuture.B. When using the unsynced method.
-
Wrap the function without a decorator. Example:
def function_name(x: str) -> Unfuture[str]: async_method = unsync(__function_name_synced) return async_method(x) def __function_name_synced(x: str) -> str: return x + 'a' future_result = function_name('b') self.assertEqual('ba', future_result.result())