malthe/pq

Adding recurring tasks?

ThibTrip opened this issue ยท 4 comments

Hello,

is it possible to add recurring tasks similarly to a cronjob? I scrolled quickly through the issues and the source code and could not find any indication for that. If this functionality does not exist I'd be interested in easy workaround ideas ๐Ÿค” .

The use case is a queue of URLs to be crawled from multiple computers regularly (e.g. everyday at 1 AM). To make things more difficult all URLs are crawled at different times (some at 1 AM, others at 2 AM etc.).

It seems this was implemented in the django-pq library but it is unmaintained and there has not been a commit since 2014. Also your library has been very reliable for me so it'd be nice to use it for this as well ๐Ÿ‘.

Specifications

  • Version: 1.9.0
  • Python version: 3.8.3

You could use a cronjob to add items to the queue. You can then either use schedule_at to defer execution to a later time or just rely on the cronjob schedule to make the queue item available for work.

stas commented

@ThibTrip I second @malthe here

Add the logic to self-schedule a job in the future, pq will take care of it (if your job does too many things, create a dispatcher worker). ๐Ÿ™Œ

Thanks for your quick answers ๐Ÿ‘ !

@stas This is the solution I went for ๐Ÿ™ˆ ... Not very proud of it but it seems to work well. I am unsure whether timezones are well respected with my code though ๐Ÿค” . pq does not seem to save the timezone for schedule_at (because I always provide schedule_at as a datetime with UTC timezone).

import datetime
import pytz
from croniter import croniter # pip install croniter
from loguru import logger # pip install loguru

def requeue_job(queue, job, delete_job=False, engine=None):
    """
    Requeues a job created with the use of the library pq.
    This is a workaround for making recurring tasks.

    The job must fullfill the following conditions:
    * have a key called "cron" with a valid cronjob expression in its data
    * have a datetime for its schedule_at attribute

    **WARNING**: we assume the timezone of the datetime is UTC (pq does not
    seem to save the TZ information in the attribute schedule_at).

    Parameters
    ----------
    queue : pq.Queue
    job : pq.Job
    delete_job : bool, default False
        Whether to delete the job that's been passed (happens after requeuing it).
        If True, an engine must be provided.
    engine : sqlalchemy.engine.base.Engine or None, default None
        Needed if delete_job is True

    Examples
    --------
    >>> import datetime
    >>>
    >>> # let's assume you have defined a queue somewhere
    >>> queue.put(data={'url':'https://www.github.com', 'cron':'0 2 * * *'},
    ...           schedule_at=datetime.datetime.now().astimezone(datetime.timezone.utc))
    >>>
    >>> job = queue.get()
    >>> # do something...
    >>> requeue_job(queue=queue, job=job)
    """
    # verify data
    ## type
    data = job.data
    if not isinstance(data, dict):
        raise TypeError(f'Expecting job.data to be dict. Received type {type(data)} instead')
    ## content
    if 'cron' not in data:
        raise ValueError('Key "cron" is missing from job.data')
    if not job.schedule_at:
        raise ValueError('schedule_at must have been provided for the job (it is None)!')

    # make sure schedule_at is timezoned
    schedule_at = job.schedule_at
    if schedule_at.tzinfo is None:
        schedule_at = pytz.utc.localize(schedule_at)

    # get next schedule based on cron expression
    cron, start_time = data['cron'], job.schedule_at
    iter = croniter(expr_format=cron, start_time=start_time)
    schedule_at = iter.get_next(datetime.datetime)
    logger.debug(f'Requeuing job {job.id}. Next execution: {schedule_at} (base_time: {start_time}, cron: {cron})')

    # requeue job
    new_job_id = queue.put(job.data, schedule_at=schedule_at)

    # delete job
    if delete_job:
        if not engine:
            raise ValueError('I need a sqlalchemy engine to drop jobs! You have not provided any')
        engine.execute(f'DELETE FROM "{queue.name}" WHERE id=%(job_id)s;', job_id=job.id)
    return new_job_id

I think this is a nice solution for the case where you want to schedule a subsequent run only if the queue item is being "worked".