/django-apscheduler

APScheduler for Django

Primary LanguagePythonMIT LicenseMIT

Django APScheduler

Build status codecov PyPI version Code style:black

APScheduler for Django.

This is a Django app that adds a lightweight wrapper around APScheduler. It enables storing persistent jobs in the database using Django's ORM.

Features of this package include:

  • A custom DjangoJobStore: an APScheduler job store that persists scheduled jobs to the Django database.
  • The job store also maintains a history of all executions of the currently scheduled jobs, along with status codes and exceptions (if any).
  • You can view the scheduled jobs and monitor the job execution directly via the Django admin interface.

Installation

pip install django-apscheduler

Quick start

  • Add django_apscheduler to your INSTALLED_APPS setting like this:
INSTALLED_APPS = (
    # ...
    "django_apscheduler",
)
  • The format for displaying run time timestamps in the Django admin site is configurable using APSCHEDULER_DATETIME_FORMAT. The default just adds seconds to the standard Django format. This is useful for displaying the exact run time of jobs that are scheduled to run on intervals of less than a minute.
APSCHEDULER_DATETIME_FORMAT =  "N j, Y, f:s a"  # Default
  • Run python manage.py migrate to create the django_apscheduler models.

  • Add a custom Django management command to your project that schedules the APScheduler jobs and starts the scheduler:

# runapscheduler.py
import logging

from django.conf import settings

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore, register_events
from django_apscheduler.models import DjangoJobExecution


logger = logging.getLogger(__name__)
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logger.addHandler(console)
logger.setLevel(logging.INFO)


def my_job():
    #  Your job processing logic here... 
    pass

def delete_old_job_executions(max_age=604_800):
    """This job deletes all apscheduler job executions older than `max_age` from the database."""
    DjangoJobExecution.objects.delete_old_job_executions(max_age)


class Command(BaseCommand):
    help = "Runs apscheduler."

    def handle(self, *args, **options):
        scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
        scheduler.add_jobstore(DjangoJobStore(), "default")
        
        scheduler.add_job(
            my_job,
            trigger=CronTrigger(second="*/10"),  # Every 10 seconds
            id="my_job",
            max_instances=1,
            replace_existing=True,
        )
        logger.info("Added job 'my_job'.")

        scheduler.add_job(
            delete_old_job_executions,
            trigger=CronTrigger(
                day_of_week="mon", hour="00", minute="00"
            ),  # Midnight on Monday, before start of the next work week.
            id="delete_old_job_executions",
            max_instances=1,
            replace_existing=True,
        )
        logger.info(
            "Added weekly job: 'delete_old_job_executions'."
        )
        register_events(scheduler)

        try:
            logger.info("Starting scheduler...")
            scheduler.start()
        except KeyboardInterrupt:
            logger.info("Stopping scheduler...")
            scheduler.shutdown()
            logger.info("Scheduler shut down successfully!")
  • This management command should be invoked via ./manage.py runapscheduler whenever the web server serving your Django application is started. The details of how and where this should be done is implementation specific, and depends on which web server you are using and how you are deploying your application to production. For most people this should involve configuring a supervisor process of sorts.

  • Register any APScheduler jobs as you would normally. Note that if you haven't set DjangoJobStore as the 'default' job store, then you will need to include jobstore='djangojobstore' in your scheduler.add_job calls.

  • Don't forget to give each job a unique id using the id parameter. For example:
@scheduler.scheduled_job("interval", seconds=60, id="job")
def job():
    pass

or use the custom register_job decorator for job registration. This will assign a unique id automatically:

from django_apscheduler.jobstores import register_job

@register_job("interval", seconds=60)
def job():
    pass
  • All of the jobs that have been scheduled are viewable directly in the Django admin interface.

  • django_apscheduler will create a log of all job executions and their associated APScheduler status that can also be viewed via the Django admin interface.

  • Note that APScheduler will automatically remove jobs that are not scheduled to trigger again from the job store. This will also delete the corresponding job execution entries for that job from the database (i.e. job execution logs are only maintained for 'active' jobs.)

Caveats

If you would prefer running a BackgroundScheduler directly in your Django application so that you can add and remove jobs dynamically at runtime, instead of using a BlockingScheduler in a separate Django management command on a fixed execution schedule as outlined above, then you should be aware of the following potential issues and limitations that are imposed by APScheduler:

  • Using APScheduler with uWSGI requires some additional configuration steps in order to re-enable threading support.

  • If you intend to run more than one worker process in production, then it is likely that you will have to implement your own remote processing logic to ensure that a single APScheduler job store can be used by all of the worker processes. Neglecting this step could result in jobs being missed or executed multiple times, as well as duplicate entries in the DjangoJobExecution tables being created.

Project resources