/aschedule

schedule your co-routines

Primary LanguagePythonMIT LicenseMIT

aschedule

build status coverage codacy mitlicence

Python asyncio Job Scheduling.

aschedule (async schedule) allows you to schedule python co-routines periodically or at a specific time.

Install

pip install aschedule==0.0.2.dev0

Quick Start

Print most cpu consumption percentage every second
import asyncio, psutil
import aschedule

async def print_cpu():
    print('report', round(loop.time()), ':', psutil.cpu_percent())

schedule = aschedule.every(print_cpu, seconds=1)
loop = asyncio.get_event_loop()
loop.run_until_complete(schedule.future)
Send a message to developer's telegram account about website's analytics
BOT_TOKEN, CHAT_ID = '123456789:ABC_DEFGHJsfafs-fasdfs32sdfs7sEW', 124254321
import aiohttp, json
import aschedule

URL = 'https://api.telegram.org/bot{}/sendmessage'.format(BOT_TOKEN)
number_of_requests = 0

async def send_stats():
    payload = {
        'chat_id': CHAT_ID,
        'text': str(number_of_requests),
    }
    async with aiohttp.ClientSession() as session:
        async with session.get(URL, params=payload) as response:
            print(await response.text())  # log the reponse

async def handle(request):
    global number_of_requests
    number_of_requests += 1
    text = "Hello, " + request.match_info.get('name', "Anonymous")
    return aiohttp.web.Response(body=text.encode('utf-8'))

app = aiohttp.web.Application()
app.router.add_route('GET', '/{name}', handle)

aschedule.every(send_stats, seconds=30)  # send stats to the developer every 30 seconds

aiohttp.web.run_app(app)
Send an email to the users everyday
import asyncio
import aschedule

async def send_email_to_users():
    # send an email to the users
    pass

schedule = aschedule.every(send_email_to_users, days=1)
asyncio.get_event_loop().run_until_complete(schedule.future)

Testing

nosetests --with-coverage --cover-package=aschedule --cover-min-percentage=90 --cover-config-file=.coveragerc --processes=50 --process-timeout=600 --cover-inclusive