python-arq/arq

Job abort not working ?

gerazenobi opened this issue ยท 9 comments

Hi there ๐Ÿ‘‹

I am starting to use Arq and it looks wonderful, thank you so much for this much needed framework !

I was having the following 2 issues:

  • job.abort() is not working.
  • can't delete a job (not sure if this is supported or there is a workaround so that we are able to re-enqueue a job with same custom id.

Here are the details:

Context

  • I enqueue jobs with a custom ID
  • I want results to live for X amount of time (e.g. for 24hs)
  • A new job arrives, and if the same ID exists (queued, in_progress or complete) I need to overwrite with this fresher one
    • (A) for queued or in_progress scenario: I understand I'd need to abort the existing job (as I don't want to wait for its completion) and enqueue the new one with fresher arguments.
    • (B) if the existing job with same ID is complete, overwrite it: enqueue the new job with fresher arguments.

The issues I was having then:

A) Running job.abort() with or without timemout seems to have no effect in neither queued or in_progress jobs.
FWIW: I have observed 1 thing though: If I abort providing a 0 timeout, only when the worker dequeues that particular job, then it will decide to not run it: 10:56:34: 131.05s โŠ˜ my_id_10:compute aborted before start

Question: is there anything I am missing regarding aborting jobs? Any idea why it isn't working for me?

B) If I enqueue a job with the same ID as a complete one, it will not enqueue it and instead would return Noneas per job uniqueness.
Question: How do I delete the job then so that I am able to enqueue the fresher one ?


Worker settings and job generation
import asyncio
from arq import create_pool
from arq.connections import RedisSettings


async def compute(ctx):
    print(f'compute called {ctx["job_id"]}')
    await asyncio.sleep(3600)
    return ctx["job_id"]


async def main():
    redis = await create_pool(RedisSettings())
    for job_index in range(1, 11):
        job = await redis.enqueue_job("compute", _job_id=f"my_id_{job_index}", _queue_name="example_queue")
        if job:
            print(job.job_id)


class WorkerSettings:
    functions = [compute]
    max_jobs = 3
    keep_result = 3600 * 24
    queue_name = "example_queue"
    allow_abort_jobs = True
    health_check_interval = 5


if __name__ == "__main__":
    asyncio.run(main())
Code trying to abort job
import asyncio
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
from arq.jobs import Job

JOB_ID = "my_id_10"

async def main(queue_name):
    redis = await create_pool(RedisSettings())
    jobs = await redis.queued_jobs(queue_name=queue_name)
    print(f"queued jobs {len(jobs)}")

    job = Job(JOB_ID, redis, _queue_name=queue_name)
    print(f"job info: {await job.info()}")
    job_status = await job.status()
    print(f"status: {job_status}")
    print(f"result_info: {await job.result_info()}")
    try:
        aborted = await job.abort(timeout=1)
        print(f"aborted: {aborted}")
    except Exception as e:
        print("exception raised when aborting job")
        print(f"exception when aborting: {type(e)}")


if __name__ == "__main__":
    asyncio.run(main("example_queue"))

Any help would be much appreciated ๐Ÿ™

I'll have to look into this further tomorrow, as I have never had the need to cancel jobs myself.

Running job.abort() with or without timemout seems to have no effect in neither queued or in_progress jobs.

This seems like a bug.

Hey, apologize for the lack of response. I haven't forgot, just not found the time. I've got off on Friday, and I'll use that day for open source/personal projects and get back to you then.

@JonasKs no problem! and thanks โค๏ธ

Hi again @gerazenobi ๐Ÿ˜Š First, in case someone else reads this issue at a later point:

The abort() function requires the allow_abort_jobs flag has been set on the worker:

:param allow_abort_jobs: whether to abort jobs on a call to :func:arq.jobs.Job.abort

The docs also state this here.

I can see you've done this! ๐Ÿ˜Š

The next thing I did was to check out your implementation. Knowing that this library has some quirkiness to queue-names, I tried removing queue_name from your implementation, and abort seems to work fine. I started working on a queue-name fix here, which I just commited (a very old branch though, it don't work, and I don't remember why ๐Ÿ˜). The issue is somewhat explained in #346.

So, short workaround: Use default queue.
Best workaround: Attempt to fix queue names. I won't have time to do this atm, I'm flooded with work at work - hence I took time off to answer my 50 GitHub notifications ๐Ÿ˜ PR very welcome.

Hi @JonasKs ๐Ÿ‘‹

Thanks so much for having taken a look at this and providing a workaround ๐Ÿ™ I will test on my side and get back to you.

@JonasKs
I tried again with the code provided previously and it didn't work for me even though I removed queue name and also custom ids in order to have the bare minimuim:

#the job
async def compute(ctx):
    await asyncio.sleep(30)
    return ctx["job_id"]
#enqueuing jobs
async def main():
    redis = await create_pool(RedisSettings())
    for job_index in range(1, 50):
        job = await redis.enqueue_job("compute")
        if job:
            print(job.job_id)
class WorkerSettings:
    functions = [compute]
    keep_result = 3600 * 24
    allow_abort_jobs = True

how I am aborting:

JOB_ID = "d85e5e5b62074cf59aea481783e8200a"

async def main():
    redis = await create_pool(RedisSettings())
    job = Job(JOB_ID, redis)
    job_info = await job.info()
    job_status = await job.status()
    print(f"job info: {job_info}")
    print(f"status: {job_status}")
    print(f"result_info: {job_info}")
    try:
        await job.abort(timeout=0)
    except Exception as e:
        print("job abort raised exception", e)


if __name__ == "__main__":
    asyncio.run(main())

I enqueued the jobs, and run the abort script above multiple times: it prints status: queued always.

Using your example from the first post results in this output:

queued jobs 10
job info: JobDef(function='compute', args=(), kwargs={}, job_try=None, enqueue_time=datetime.datetime(2023, 5, 25, 7, 14, 55, 350000, tzinfo=datetime.timezone.utc), score=1684998895350)
status: JobStatus.queued
result_info: None
exception raised when aborting job
exception when aborting: <class 'TimeoutError'>

Removing queue names:

import asyncio
from arq import create_pool
from arq.connections import RedisSettings


async def compute(ctx):
    print(f'compute called {ctx["job_id"]}')
    await asyncio.sleep(3600)
    return ctx["job_id"]


async def main():
    redis = await create_pool(RedisSettings())
    for job_index in range(1, 11):
        job = await redis.enqueue_job("compute", _job_id=f"my_id_{job_index}")
        if job:
            print(job.job_id)


class WorkerSettings:
    functions = [compute]
    max_jobs = 3
    keep_result = 3600 * 24
    allow_abort_jobs = True
    health_check_interval = 5


if __name__ == "__main__":
    asyncio.run(main())
import asyncio
import asyncio
from arq import create_pool
from arq.connections import RedisSettings
from arq.jobs import Job

JOB_ID = "my_id_10"

async def main():
    redis = await create_pool(RedisSettings())
    jobs = await redis.queued_jobs()
    print(f"queued jobs {len(jobs)}")

    job = Job(JOB_ID, redis)
    print(f"job info: {await job.info()}")
    job_status = await job.status()
    print(f"status: {job_status}")
    print(f"result_info: {await job.result_info()}")
    try:
        aborted = await job.abort(timeout=1)
        print(f"aborted: {aborted}")
    except Exception as e:
        print("exception raised when aborting job")
        print(f"exception when aborting: {type(e)}")


if __name__ == "__main__":
    asyncio.run(main())

Results in this output:

queued jobs 0
job info: JobResult(function='compute', args=(), kwargs={}, job_try=1, enqueue_time=datetime.datetime(2023, 5, 25, 7, 14, 55, 350000, tzinfo=datetime.timezone.utc), score=None, success=False, result=CancelledError(), start_time=datetime.datetime(2023, 5, 25, 7, 29, 55, 591000, tzinfo=datetime.timezone.utc), finish_time=datetime.datetime(2023, 5, 25, 7, 29, 55, 594000, tzinfo=datetime.timezone.utc), queue_name='example_queue', job_id=None)
status: JobStatus.complete
result_info: JobResult(function='compute', args=(), kwargs={}, job_try=1, enqueue_time=datetime.datetime(2023, 5, 25, 7, 14, 55, 350000, tzinfo=datetime.timezone.utc), score=None, success=False, result=CancelledError(), start_time=datetime.datetime(2023, 5, 25, 7, 29, 55, 591000, tzinfo=datetime.timezone.utc), finish_time=datetime.datetime(2023, 5, 25, 7, 29, 55, 594000, tzinfo=datetime.timezone.utc), queue_name='example_queue', job_id=None)
aborted: True

Aha, I see, with the new example there's something iffy going on.

With the job:

async def compute(ctx):
    print(f'starting job {ctx}')
    await asyncio.sleep(30)
    print('slept')
    return ctx["job_id"]

Then finding the log from the worker:

starting job {'redis': ArqRedis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, 'job_id': 'afc5d7518819482eaaedffb0a82372f8', 'job_try': 1, 'enqueue_time': datetime.datetime(2023, 5, 25, 8, 17, 15, 800000, tzinfo=datetime.timezone.utc), 'score': 1685002635800}

Then aborting the job:

job info: JobDef(function='compute', args=(), kwargs={}, job_try=None, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=1685002635815)
status: JobStatus.in_progress
result_info: JobDef(function='compute', args=(), kwargs={}, job_try=None, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=1685002635815)
job abort raised exception 

Then waiting for job to complete:

job info: JobResult(function='compute', args=(), kwargs={}, job_try=1, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=None, success=True, result='f859323bff434573850b10b5363d015d', start_time=datetime.datetime(2023, 5, 25, 8, 18, 16, 173000, tzinfo=datetime.timezone.utc), finish_time=datetime.datetime(2023, 5, 25, 8, 18, 46, 177000, tzinfo=datetime.timezone.utc), queue_name='arq:queue', job_id=None)
status: JobStatus.complete
result_info: JobResult(function='compute', args=(), kwargs={}, job_try=1, enqueue_time=datetime.datetime(2023, 5, 25, 8, 17, 15, 815000, tzinfo=datetime.timezone.utc), score=None, success=True, result='f859323bff434573850b10b5363d015d', start_time=datetime.datetime(2023, 5, 25, 8, 18, 16, 173000, tzinfo=datetime.timezone.utc), finish_time=datetime.datetime(2023, 5, 25, 8, 18, 46, 177000, tzinfo=datetime.timezone.utc), queue_name='arq:queue', job_id=None)

I agree, this seems like a bug. I'll try to look into it this weekend

I managed to close issue instead of comment, sorry, reopened

Thanks @JonasKs.

Not related to this issue but thought I would share for the sake of knowledge sharing in case someone else arrives to it: at the moment, to workaround the fact we can't abort and hence use custom IDs, I was using queued_jobs method and then querying the jobs' kwargs in JobDef to manage/track which jobs are in the queue:

    async def queued_jobs(self, *, queue_name: str = default_queue_name) -> List[JobDef]:
        """
        Get information about queued, mostly useful when testing.
        """
        jobs = await self.zrange(queue_name, withscores=True, start=0, end=-1)
        return await asyncio.gather(*[self._get_job_def(job_id, int(score)) for job_id, score in jobs])

However this method will quickly exhaust redis available connections (doesn't scale) when dealing with concurrent requests/jobs as it launches a concurrent _get_job_def, for each job in the queue:

    async def _get_job_def(self, job_id: bytes, score: int) -> JobDef:
        key = job_key_prefix + job_id.decode()
        v = await self.get(key)   <================ new connection
        # ...

I was starting to see redis.exceptions.ConnectionError: max number of clients reached

All that to say: queued_jobs should be used with care and perhaps only for testing ๐Ÿค” .