python-arq/arq

Scheduling repeated unique jobs

davidhuser opened this issue · 5 comments

Thanks for the great library. Is it possible to schedule repeated unique jobs? My goal is to enqueue a job, ensure only one instance runs at a time, and re-enqueue it immediately after it finishes, possibly with a configurable interval.

I noticed that RQ offers rq-scheduler for this purpose.

Self-enqueuing, as mentioned in #432 might not be ideal.

Currently, I'm using a cron job with the second parameter set to every 10 seconds. However, the job duration varies:

async def repeated_async_job(ctx):
    async with get_session(ctx) as db:
         do_db_stuff(db)
    return 'success!'

class WorkerSettings:
    cron_jobs: list[CronJob] = [
        cron(
            repeated_async_job,
            second=set(range(0, 59, 10)),
            unique=True,
            run_at_startup=False,
            timeout=60,
            max_tries=1,
            keep_result=0
        )
    ]
    redis_settings = RedisSettings()

Would it be better to handle scheduling with a different tool, like APScheduler's AsyncIOScheduler? If so, how would the scheduler know when the job is finished, and would it matter if it runs with multiple workers (e.g. Gunicorn)?

I loosely recollect having to do this in the past... wouldn't defining a job_id for the cron(..) enforce uniqueness at run time? and to re-enqueue it immediately after it finishes, I believe the repeated_async_job(..) wrapper could enqueue it with the same job_id before it returns?

I'd prefer not to mix cron with func although the cron are converted to func. Below example is func-only.

Regarding your job_id suggestion: The first job can/should have a job_id, but all subsequent jobs cannot have the same job_id because it cannot re-enqueue a job within the job because it is not yet finished.

I tried to workaround this by querying the amount of queued jobs of this task, if it's exactly 1 (the current job) then re-enqueue the next:

import asyncio

from arq import create_pool
from arq.connections import RedisSettings

async def repeated_async_job(ctx):
    # do stuff
    asyncio.sleep(3)
    
    # re-enqueue if there is exactly one job running (this job)
    redis = ctx['redis']
    queued_jobs = await redis.queued_jobs()
    queued_jobs_len = len([job for job in queued_jobs if job.function == 'repeated_async_job'])
    if queued_jobs_len == 0:
        print("ERROR: should not happen")
    elif queued_jobs == 1:
        # the current job so we can enqueue the next, but without a job_id
        await redis.enqueue_job('repeated_async_job', _job_try=1)
    else:
        print("ERROR: too many jobs")


async def main():
    redis = await create_pool(RedisSettings())
    # startup job with unique ID
    await redis.enqueue_job('repeated_async_job', _job_id='app.main.startup', _job_try=1)


class WorkerSettings:
    functions = [repeated_async_job]

if __name__ == '__main__':
    asyncio.run(main())
    

it seems to work, but not sure if there is a better/native way to do this.

I was also wondering what _job_try really does, in the docs it says:

enqueue_job
_job_try – useful when re-enqueueing jobs within a job

but how is it useful?

Ah, now I remember that's where I got stuck, how do you re-enqueue a job_id when one is already running with the same id? (or if its result is saved, but not explicitly retrieved/deleted).

Regarding _job_try: My understanding is that _job_try is to explicitly set which 'retry attempt' number do you want to treat the enqueued job as, which should be accessible in the ctx['job_try'] in the job, and used by arq wherever job_try is referenced: https://github.com/search?q=repo%3Asamuelcolvin%2Farq+job_try&type=code - I do not think it helps in anyway with the _job_id situation here, to the best of my knowledge.

If you find a better solution, keen to learn too!

since a job can have more states than queued I'm using this check now before enqueuing:

import asyncio

from arq import create_pool
from arq.connections import RedisSettings

async def repeated_async_job(ctx):
    # do stuff
    asyncio.sleep(3)

    # Check if any job with the same function is deferred, queued, or in progress
    pool = ctx['redis']
    all_jobs = await pool.all_job_results()
    in_progress_jobs = [
        job for job in all_jobs
        if job.status in {JobStatus.deferred, JobStatus.queued, JobStatus.in_progress}
        and job.function == 'repeated_async_job'
    ]

    if in_progress_jobs:
        return 'done'

    await pool.enqueue_job('repeated_async_job')
    return 'done'


async def main():
    redis = await create_pool(RedisSettings())
    await redis.enqueue_job('repeated_async_job')


class WorkerSettings:
    functions = [repeated_async_job]

if __name__ == '__main__':
    asyncio.run(main())

it does not account for params (i.e. same job but different parameters), but for this job I don't need it.

@davidhuser are you facing the issue I've filed here by any chance, or know how to solve it? #459 where there's an in-progress key created for 60 seconds, even for a cron which I want to run every 5 or 10 seconds?