tobymao/saq

For TERMINAL_STATUSES call after_process hook after the job status is updated

evgenii-moriakhin opened this issue · 1 comments

During my usage of saq, I found that the after_process handler is not behaving as I initially expected.

For example, here's the code:

async def noop(ctx: Context):
    await asyncio.sleep(3)
    return 1

async def after_process_print(ctx: Context):
    print("status", ctx["job"].status)

async def main():
    queue = Queue(Redis())
    worker = Worker(queue, [noop], after_process=after_process_print)
    asyncio.create_task(worker.start())
    await asyncio.sleep(1)

    job = await queue.enqueue("noop")
    await asyncio.sleep(1)
    await job.abort("aborted")

    await asyncio.sleep(3)
    await worker.stop()
    await worker.queue.disconnect()

asyncio.run(main())

I expected it to print:

status Status.ABORTED

But the actual output is:

status Status.ACTIVE

Okay, if the job is still active, we can try to wait for the job to complete:

async def after_process_print(ctx: Context):
    await ctx["job"].refresh(0)
    print("status", ctx["job"].status)

And... our program will actually get stuck on this call!
This is because the coroutines in Worker.abort call has already published the event to Redis, and due to concurrent work, this happened faster than the coroutines inside Job.refresh started listening to the events.

This issue affects the usefulness of the after_process hook, as it prevents us from reliably knowing when a job has terminated (which is the most important information to have in order to make decisions in the after_process ).

Note that for the Status.COMPLETE and Status.FAILED statuses, this already works, as the statuses are updated sequentially before the after_process is called.

However, this does not work for the Status.ABORTED status.

It might make sense to guarantee that all TERMINAL_STATUSES are set, if they have occurred, before calling the after_process handler?

Thanks for the detailed issue report.

Here's a fix: #123