Python asyncio Job Scheduling.
aschedule (async schedule) allows you to schedule python co-routines periodically or at a specific time.
pip install aschedule==0.0.2.dev0
import asyncio, psutil
import aschedule
async def print_cpu():
print('report', round(loop.time()), ':', psutil.cpu_percent())
schedule = aschedule.every(print_cpu, seconds=1)
loop = asyncio.get_event_loop()
loop.run_until_complete(schedule.future)
BOT_TOKEN, CHAT_ID = '123456789:ABC_DEFGHJsfafs-fasdfs32sdfs7sEW', 124254321
import aiohttp, json
import aschedule
URL = 'https://api.telegram.org/bot{}/sendmessage'.format(BOT_TOKEN)
number_of_requests = 0
async def send_stats():
payload = {
'chat_id': CHAT_ID,
'text': str(number_of_requests),
}
async with aiohttp.ClientSession() as session:
async with session.get(URL, params=payload) as response:
print(await response.text()) # log the reponse
async def handle(request):
global number_of_requests
number_of_requests += 1
text = "Hello, " + request.match_info.get('name', "Anonymous")
return aiohttp.web.Response(body=text.encode('utf-8'))
app = aiohttp.web.Application()
app.router.add_route('GET', '/{name}', handle)
aschedule.every(send_stats, seconds=30) # send stats to the developer every 30 seconds
aiohttp.web.run_app(app)
import asyncio
import aschedule
async def send_email_to_users():
# send an email to the users
pass
schedule = aschedule.every(send_email_to_users, days=1)
asyncio.get_event_loop().run_until_complete(schedule.future)
nosetests --with-coverage --cover-package=aschedule --cover-min-percentage=90 --cover-config-file=.coveragerc --processes=50 --process-timeout=600 --cover-inclusive