A Celery-like event loop with asyncio
and no dependencies
It runs an asyncio
event loop in a separate daemon thread, drives native coroutines within the event loop, and then returns the future in an asynchronous manner.
This example sends 6000 simple HTTP GET requests with a job queue of size 30. The monitor
method shows the process, which may be just an eye candy.
- The first column shows (id of future, state of the future), the finished jobs.
- The second column shows (id of future, state of the corresponding coroutine), the running jobs.
- The last column shows the number of pending jobs.
To serve the HTTP GET responses, the Nginx docker image is used.
$ docker run --name ANY_NAME \
-v /path/to/asyncloop/examples/nginx-staticfiles:/usr/share/nginx/html:ro \
-d \
-p 8080:80 \
nginx
(venv) $ pip install aiohttp
(venv) $ python examples/aiohttp-get.py
Please note that the monitor
method does not run in Windows OS, since it uses the curses
module, which is available only in UNIX-like OS.
It requires Python 3.5+.
$ git clone https://github.com/dgkim5360/asyncloop.git
$ cd asyncloop
asyncloop$ python setup.py install
import asyncio as aio
from asyncloop import AsyncLoop
# A simple job, which should be a native coroutine
async def job_to_wait(sleep_sec):
await aio.sleep(sleep_sec)
return sleep_sec
# A simple callback
def callback(fut):
if fut.cancelled():
print('CANCELLED:', fut)
elif fut.done():
print('DONE:', fut)
print('RESULT:', fut.result()
# AsyncLoop starts
aloop = AsyncLoop(maxsize=5)
aloop.start()
# Submit a job and be free to work on
# it returns an concurrent.futures.Future object
fut = aloop.submit(job_to_wait(10), callback)
# The job immediately goes to the running queue,
# which is a simple dictionary with capacity.
aloop.running
# {
# <Future at 0x####>: <coroutine object job_to_wait>
# }
# After 10 seconds the callback activated
# DONE: <Future at 0x#### state=finished returned int>
# RESULT: 10
# Let's check the running queue again.
# Now the running queue (aloop.running) is empty!
aloop.running
# {}
# We can also confirm that the job is finished as
# the AsyncLoop instance contains the future object of finished jobs.
aloop.done
# {<Future at 0x#### state=finished returned int>}
# Get a result
assert fut.result() == 10
# Submit more jobs
aloop.submit_many((job_to_wait(5) for _ in range(10)))
# AsyncLoop only runs 5 jobs and other jobs are pending
assert aloop.running.qsize() == 5
assert aloop.pending.qsize() == 5
# After 5 seconds so that 5 jobs done,
# 5 pending jobs automatically start so that
# they move into the running queue.
assert len(aloop.done) == 6
assert aloop.running.qsize() == 5
assert aloop.pending.qsize() == 0
# Check the last results after 10 seconds so that all jobs done.
assert len(aloop.done) == 11
assert aloop.running.qsize() == 0
assert aloop.pending.qsize() == 0
So far, that's all.