tobymao/saq

Aborting jobs and readding them

markus-zoezi opened this issue · 13 comments

I have a queue for building jobs (90 seconds job). Whenever source code changes I want to post a new job.
I set the key of the job to the repository name so changes in the same repository gets the same job key.
If a change in the source code happens during a build I abort the job and enqueue it again.

job = await queue.job(key)
if job:
    await queue.abort(job, 'Abort job and add new job instead')
await queue.enqueue(Job(key=key, ...))

But - that won't work. The job isn't started.

Is abort not meant to be used like this? Any other way of handling this situation?

can you explain why it won’t work?

First I add one job and let it run for a few seconds, then abort it and enqueue a new one with the same key. The log looks like this:

INFO:saq:Processing Job<...
INFO:saq:Finished Job<...
INFO:saq:Aborting saq:job...

There is no new Processing-line after the Aborting-line.

can you give me a reproducible code snippet? it seems like you’re using key wrong. key is used to guarantee that there are no duplicate jobs. you probably shouldn’t use key

closing for now but if you provide a code snippet that i can repro it i’ll take a look.

This is worker.py started with python3 -m saq worker.settings -v -v -v

import asyncio, os
from saq import Queue
import logging
logger = logging.getLogger("saq")

async def test(ctx):
    logger.info('Starting job. sleeping for 10 seconds')
    await asyncio.sleep(10)
    logger.info('Done sleeping')

queue = Queue.from_url(os.getenv('REDIS'))

settings = {
    "queue": queue,
    "functions": [test],
    "concurrency": 2
}

And this is used to add jobs. Run it twice in a row:

import asyncio, os
from saq import Job, Queue

async def main():
    queue = Queue.from_url(os.getenv('REDIS'))

    job = await queue.job('test')
    if job:
        print('Aborting')
        await queue.abort(job, 'New build task added')

    print('Adding task')
    await queue.enqueue(Job(key='test', function='test'))

if __name__ == "__main__":
    asyncio.run(main())

how quickly are you running it twice in a row? the abort has a 5 second ttl

Immediately after the first add I run the second add

there’s a 5 second window where you can’t readd something you aborted, it’s an option in the abort method

Ok I see. What is your suggestion on how to do this? Set ttl to 0? Or some other way?

ttl 0 won’t work because then it won’t get aborted. i suggest not using a key.

I use the key so that I can find the job to see if there is already an ongoing job, and abort it. If it's a random id, I'd have to store that Job id someplace and keep track of that. Unless there is some other way to loop through all future jobs and check their arguments or something.

then you need to check that a job is actually aborted before enqueuing the next job

How do you mean I can check that? Can you give me an example?

check the job status