farahats9/sqlalchemy-celery-beat

Cron task is getting called twice

seanic1979 opened this issue · 7 comments

I have a function that gets called from a flask endpoint to insert/update a task and its schedule.

@app.task
def update_task_schedule(name):
    session = session_maker()

    schedule = CrontabSchedule.from_schedule(
        session,
        schedules.crontab(
            minute='22',
            hour='*',
            day_of_week='*',
           day_of_month='*',
           month_of_year='*',
        )
    )

    task_stmt = db.select(PeriodicTask).where(PeriodicTask.name == name)
    result = session.execute(task_stmt)
    task = result.first()

    if task is None:
        stmt = db.insert(PeriodicTask).values(
            name=name,
            task='tasks.celery.index',
            schedule_id=schedule.id,
            discriminator=schedule.discriminator)
    else:
        stmt = db.update(PeriodicTask).values(
        schedule_id=schedule.id,
        discriminator=schedule.discriminator).where(PeriodicTask.name == name)

    session.execute(stmt)
    session.commit()

    PeriodicTaskChanged.update_from_session(session)

After this function is called the task looks like this in the database:

'3','index','tasks.celery.index','[]','{}',NULL,NULL,NULL,'{}',NULL,NULL,NULL,'0',NULL,'1',NULL,'0','2024-02-15 15:21:27','','crontabschedule','11'

'11','22','*','*','*','*','UTC'

On the console I see the task being invoked twice:

INFO: 2024/02/15 11:22:00 Task tasks.celery.index[11418f46-42d5-4db5-9605-83831699b097] received
DEBUG: 2024/02/15 11:22:00 TaskPool: Apply <function fast_trace_task at 0x10ca44700> (args:('tasks.celery.index', '11418f46-42d5-4db5-9605-83831699b097', {'lang': 'py', 'task': 'tasks.celery.index', 'id': '11418f46-42d5-4db5-9605-83831699b097', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '11418f46-42d5-4db5-9605-83831699b097', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen20489@greg-15-mbp.lan', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '11418f46-42d5-4db5-9605-83831699b097', 'reply_to': '6ddb9a64-3fe0-310e-948b-f79802c585a0', 'periodic_task_name': 'index', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '3654ceb6-cb2b-430a-988d-d33d24e32e61'}, 'reply_to': '6ddb9a64-3fe0-310e-948b-f79802c585a0', 'correlation_id': '11418f46-42d5-4db5-9605-83831699b097', 'hostname': 'celery@greg-15-mbp.lan', 'delivery_info':... kwargs:{})

WARNING: 2024/02/15 11:22:00 INDEXING

INFO: 2024/02/15 11:22:00 Task tasks.celery.index[11418f46-42d5-4db5-9605-83831699b097] succeeded in 0.0007132080000360475s: None
INFO: 2024/02/15 11:22:00 Task tasks.celery.index[88c8b00f-920f-4763-9ca9-571f6746cb59] received
DEBUG: 2024/02/15 11:22:00 TaskPool: Apply <function fast_trace_task at 0x10ca44700> (args:('tasks.celery.index', '88c8b00f-920f-4763-9ca9-571f6746cb59', {'lang': 'py', 'task': 'tasks.celery.index', 'id': '88c8b00f-920f-4763-9ca9-571f6746cb59', 'shadow': None, 'eta': None, 'expires': None, 'group': None, 'group_index': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '88c8b00f-920f-4763-9ca9-571f6746cb59', 'parent_id': None, 'argsrepr': '()', 'kwargsrepr': '{}', 'origin': 'gen19113@greg-15-mbp.lan', 'ignore_result': False, 'replaced_task_nesting': 0, 'stamped_headers': None, 'stamps': {}, 'properties': {'correlation_id': '88c8b00f-920f-4763-9ca9-571f6746cb59', 'reply_to': '5ccfc9a3-ed52-3606-b511-260e477914f9', 'periodic_task_name': 'index', 'delivery_mode': 2, 'delivery_info': {'exchange': '', 'routing_key': 'celery'}, 'priority': 0, 'body_encoding': 'base64', 'delivery_tag': '39dacb2b-c6a0-4e31-8179-256eebb46372'}, 'reply_to': '5ccfc9a3-ed52-3606-b511-260e477914f9', 'correlation_id': '88c8b00f-920f-4763-9ca9-571f6746cb59', 'hostname': 'celery@greg-15-mbp.lan', 'delivery_info':... kwargs:{})

WARNING: 2024/02/15 11:22:00 INDEXING

INFO: 2024/02/15 11:22:00 Task tasks.celery.index[88c8b00f-920f-4763-9ca9-571f6746cb59] succeeded in 0.0005478840000137097s: None

I tested your code and it executed only once. The only change I had was using the session from sqlalchemy-celery-beat like this:

from sqlalchemy_celery_beat.session import SessionManager

session_manager = SessionManager()

@app.task
def update_task_schedule(name):
    session = session_manager.session_factory(db_connection_uri)
    ...

Please add more information like your python version sqlalchemy version platform, celery config and version etc..

Alright down to a single call now. I was starting celery like this:

celery --app=tasks.celery worker --beat --concurrency=1 --loglevel=DEBUG -P solo -S 
sqlalchemy_celery_beat.schedulers:DatabaseScheduler

I removed "--beat" and it works properly. However, there is something that I don't understand. I'm not running a separate celery beat instance. So was beat running twice when I had the "--beat" argument? Once because of "--beat" and once because the the "sqlalchemy_celery_beat.schedulers:DatabaseScheduler" part?

Tested again with the same command, no duplicate tasks. If this is happening as soon as you start the worker then it could be some lingering task from before you shut it down. Please check your queue for any remaining tasks.

As for your question, the --beat argument combines beat with the worker, it is generally not recommended in production and only for testing. The scheduler part has nothing to do with how many celery instances running, it just points to the scheduler that has the tasks.

So how are things working for me now then. I only run:

celery --app=tasks.celery worker --concurrency=1 --loglevel=DEBUG -P solo -S 
sqlalchemy_celery_beat.schedulers:DatabaseScheduler

So no separate process is running celery beat, yet things are working perfect. All tasks are getting called as per the schedule. Shouldn't that only run a worker that is waiting for something to do? I admit my celery knowledge is not great :)

That doesn't happen and shouldn't happen, the -S argument doesn't have any effect if there's no beat running. As I said this could be lingering task in the queue or you may have beat running somewhere.

Disregard my last question. Looks like I probably had a runaway celery beat process running on my laptop. I rebooted and things did not work until I put "--beat" back. This was probably why I was getting more than one call to the tasks in the first place.

Great! will close this issue then. Please keep posting feedback if you run into any issues.