tobymao/saq

Job is not performed if there is a scheduled job

markus-zoezi opened this issue · 6 comments

I have problems getting saq to work properly when using scheduled tasks.

As soon as I enqueue() a Job with scheduled set to something else than 0 I run into problems.

The scheduled job gets executed, but any jobs I try to enqueue during the time until the scheduled job is executed are silently ignored. They are not even performed after the scheduled job, they are simply just gone.

I've tried to use multiple workers, but that doesn't help. Any help in debugging this or help in understand how to setup this would be appreciated.

can you give me a minimal reproducible setup script?

I think I actually found something. I have the following. key is normally None, so Job gets passed None - which it doesn't seem to like. If I don't pass key at all - it seems to work much better.

def create_task(host, method, params, time=None, key=None):
    from saq import Queue, Job
    queue = Queue.from_url(os.getenv('REDIS'))

    job = Job(
        timeout=0,
        function='rpc',
        scheduled=time or 0,
        key=key,
        kwargs=dict(
            host=host,
            data={
                'method': method,
                'params': params
            }
        )
    )
    await queue.enqueue(job)

key is used to avoid duplicate jobs, if you set the same value to multiple jobs, only one will run. this seems to be your issue.

you don't need to pass in key, it's set automatically as a uuid

Yes I understand that now. Thanks!

So my updated function becomes:

async def create_task_ecs(host, method, params, time=None, key=None):
    from saq import Queue, Job
    queue = Queue.from_url(os.getenv('REDIS'))

    job_args = dict(
        timeout=0,
        function='rpc',
        scheduled=time or 0,
        kwargs=dict(
            host=host,
            data={
                'method': method,
                'params': params
            }
        )
    )
    if key:
        job_args['key'] = key

    job = Job(**job_args)
    await queue.enqueue(job)

And that seems to work much better.