For TERMINAL_STATUSES call after_process hook after the job status is updated
evgenii-moriakhin opened this issue · 1 comments
During my usage of saq
, I found that the after_process
handler is not behaving as I initially expected.
For example, here's the code:
async def noop(ctx: Context):
await asyncio.sleep(3)
return 1
async def after_process_print(ctx: Context):
print("status", ctx["job"].status)
async def main():
queue = Queue(Redis())
worker = Worker(queue, [noop], after_process=after_process_print)
asyncio.create_task(worker.start())
await asyncio.sleep(1)
job = await queue.enqueue("noop")
await asyncio.sleep(1)
await job.abort("aborted")
await asyncio.sleep(3)
await worker.stop()
await worker.queue.disconnect()
asyncio.run(main())
I expected it to print:
status Status.ABORTED
But the actual output is:
status Status.ACTIVE
Okay, if the job is still active, we can try to wait for the job to complete:
async def after_process_print(ctx: Context):
await ctx["job"].refresh(0)
print("status", ctx["job"].status)
And... our program will actually get stuck on this call!
This is because the coroutines in Worker.abort
call has already published the event to Redis, and due to concurrent work, this happened faster than the coroutines inside Job.refresh
started listening to the events.
This issue affects the usefulness of the after_process
hook, as it prevents us from reliably knowing when a job has terminated (which is the most important information to have in order to make decisions in the after_process
).
Note that for the Status.COMPLETE
and Status.FAILED
statuses, this already works, as the statuses are updated sequentially before the after_process
is called.
However, this does not work for the Status.ABORTED
status.
It might make sense to guarantee that all TERMINAL_STATUSES
are set, if they have occurred, before calling the after_process
handler?