teamclairvoyant/airflow-maintenance-dags

Db cleanup failes after upgrade to python3.9 and airflow2.2.2

hanizaidi110 opened this issue · 9 comments

I have recently upgraded to Python3.9 Airflow2.2.2. Following error is occurring repeatedly after the upgrade. I've only changed the parameters indicated in the repo README and is running everything else the same. Other lib versions:
SQLAlchemy 1.4.1
Flask-SQLAlchemy 2.4.3
Can you please check the problem here?

Job 108936: Subtask cleanup_TaskInstance
Running <TaskInstance: admin_airflow_db_cleanup.cleanup_TaskInstance manual__2021-12-01T09:58:53.886445+00:00 [running]> on host 2e7d9eccb27e
Exporting the following env vars:
AIRFLOW_CTX_DAG_EMAIL=l9b0o0x6t8n3v7s3@vimcar.slack.com,team-bi@vimcar.com
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=admin_airflow_db_cleanup
AIRFLOW_CTX_TASK_ID=cleanup_TaskInstance
AIRFLOW_CTX_EXECUTION_DATE=2021-12-01T09:58:53.886445+00:00
AIRFLOW_CTX_DAG_RUN_ID=manual__2021-12-01T09:58:53.886445+00:00
Retrieving max_execution_date from XCom
Configurations:
max_date:                 2021-11-01 09:58:59.085046+00:00
enable_delete:            True
session:                  <sqlalchemy.orm.session.Session object at 0x7f607aed7c40>
airflow_db_model:         <class 'airflow.models.taskinstance.TaskInstance'>
state:                    None
age_check_column:         ColumnAssociationProxyInstance(AssociationProxy('dag_run', 'execution_date'))
keep_last:                False
keep_last_filters:        None
keep_last_group_by:       None

Running Cleanup Process...
Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 151, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 162, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/vimcar-bi/airflow_dags/admin_airflow_db_cleanup.py", line 291, in cleanup_function
    query = session.query(airflow_db_model).options(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 1619, in options
    return self._options(False, *args)
  File "<string>", line 2, in _options
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/base.py", line 227, in generate
    fn(self, *args[1:], **kw)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 1638, in _options
    opt.process_query(self)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 176, in process_query
    self._process(query, True)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 677, in _process
    val._bind_loader(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 829, in _bind_loader
    raise sa_exc.ArgumentError(
sqlalchemy.exc.ArgumentError: mapper option expects string key or list of attributes
Marking task as UP_FOR_RETRY. dag_id=admin_airflow_db_cleanup, task_id=cleanup_TaskInstance, execution_date=20211201T095853, start_date=20211201T170532, end_date=20211201T170532
Failed to execute job 108936 for task cleanup_TaskInstance
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/task/task_runner/standard_task_runner.py", line 85, in _start_by_fork
    args.func(args, dag=self.dag)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/cli.py", line 92, in wrapper
    return f(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 292, in task_run
    _run_task_by_selected_method(args, dag, ti)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 107, in _run_task_by_selected_method
    _run_raw_task(args, ti)
  File "/usr/local/lib/python3.9/site-packages/airflow/cli/commands/task_command.py", line 180, in _run_raw_task
    ti._run_raw_task(
  File "/usr/local/lib/python3.9/site-packages/airflow/utils/session.py", line 70, in wrapper
    return func(*args, session=session, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1458, in _execute_task_with_callbacks
    result = self._execute_task(context, self.task)
  File "/usr/local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1514, in _execute_task
    result = execute_callable(context=context)
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 151, in execute
    return_value = self.execute_callable()
  File "/usr/local/lib/python3.9/site-packages/airflow/operators/python.py", line 162, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/vimcar-bi/airflow_dags/admin_airflow_db_cleanup.py", line 291, in cleanup_function
    query = session.query(airflow_db_model).options(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 1619, in options
    return self._options(False, *args)
  File "<string>", line 2, in _options
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/base.py", line 227, in generate
    fn(self, *args[1:], **kw)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/query.py", line 1638, in _options
    opt.process_query(self)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 176, in process_query
    self._process(query, True)
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 677, in _process
    val._bind_loader(
  File "/usr/local/lib/python3.9/site-packages/sqlalchemy/orm/strategy_options.py", line 829, in _bind_loader
    raise sa_exc.ArgumentError(
sqlalchemy.exc.ArgumentError: mapper option expects string key or list of attributes
Task exited with return code 1
0 downstream tasks scheduled from follow-on schedule check

Seems duplicated with #117

I think that the issue is very similar but I'm using airflow 2.3.2 from docker with python 3.7. I'm getting the same error only for following tasks: cleanup_TaskInstance, cleanup_BaseXCom, cleanup_TaskReschedule, cleanup_RenderedTaskInstanceFields. Other tasks finish successfully.

[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:265} INFO - Retrieving max_execution_date from XCom
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:278} INFO - Configurations:
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:279} INFO - max_date:                 2022-05-21 17:18:41.939540+00:00
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:280} INFO - enable_delete:            True
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:281} INFO - session:                  <sqlalchemy.orm.session.Session object at 0x7f7102e0c750>
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:282} INFO - airflow_db_model:         <class 'airflow.models.taskinstance.TaskInstance'>
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:283} INFO - state:                    None
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:284} INFO - age_check_column:         ColumnAssociationProxyInstance(AssociationProxy('dag_run', 'execution_date'))
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:285} INFO - keep_last:                False
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:286} INFO - keep_last_filters:        None
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:287} INFO - keep_last_group_by:       None
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:289} INFO - 
[2022-06-20, 17:19:52 UTC] {airflow-db-cleanup.py:291} INFO - Running Cleanup Process...
[2022-06-20, 17:19:52 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 171, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/operators/python.py", line 189, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/airflow-db-cleanup.py", line 298, in cleanup_function
    logging.info("INITIAL QUERY : " + str(query))
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/query.py", line 2848, in __str__
    return str(statement.compile(bind))
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 506, in compile
    return self._compiler(dialect, **kw)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 570, in _compiler
    return dialect.statement_compiler(dialect, self, **kw)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 766, in __init__
    Compiled.__init__(self, dialect, statement, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 455, in __init__
    self.string = self.process(self.statement, **compile_kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 490, in process
    return obj._compiler_dispatch(self, **kwargs)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/visitors.py", line 81, in _compiler_dispatch
    return meth(self, **kw)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/compiler.py", line 2981, in visit_select
    select_stmt, self, **kwargs
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/sql/base.py", line 501, in create_for_statement
    return klass.create_for_statement(statement, compiler, **kw)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/context.py", line 579, in create_for_statement
    opt.process_compile_state(self)
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/strategy_options.py", line 185, in process_compile_state
    self._process(compile_state, not bool(compile_state.current_path))
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/strategy_options.py", line 718, in _process
    raiseerr,
  File "/home/airflow/.local/lib/python3.7/site-packages/sqlalchemy/orm/strategy_options.py", line 870, in _bind_loader
    "mapper option expects " "string key or list of attributes"
sqlalchemy.exc.ArgumentError: mapper option expects string key or list of attributes
[2022-06-20, 17:19:52 UTC] {taskinstance.py:1400} INFO - Marking task as FAILED. dag_id=airflow-db-cleanup, task_id=cleanup_TaskInstance, execution_date=20220619T000000, start_date=20220620T171951, end_date=20220620T171952
[2022-06-20, 17:19:52 UTC] {logging_mixin.py:115} WARNING - /home/airflow/.local/lib/python3.7/site-packages/airflow/models/param.py:62 DeprecationWarning: The use of non-json-serializable params is deprecated and will be removed in a future release

Tasks cleanup_TaskInstance and cleanup_TaskReschedule fail with Python 3.8.12 and Airflow 2.2.2. Same error sqlalchemy.exc.ArgumentError: mapper option expects string key or list of attributes

This seems to be related to missing columns in the db tables to be cleaned for those two tasks.

I fixed it replacing execution_date with end_date in age_check_column for both cleanup_TaskInstance and cleanup_TaskReschedule. Let me know whether there is a more appropriate field that should be used.

I also suggest adding

# Extract db table columns' names
    table_columns = [str(colname).split(".")[1] for colname in airflow_db_model.__table__.columns]
    age_check_col_name = str(age_check_column).split(".")[1]

    if age_check_col_name not in table_columns:
        raise ValueError(f"{age_check_col_name} field not in table {airflow_db_model.__table__.name}")

in cleanup_function to check that the age_check_column selected for a specific column is found in the table to be cleaned

I read thru #117 and it looks that comment from @PhilippDB makes sense and can be the solution.

@e-compagno please be aware of what wrote @tylerwmarrs - using start_date or end_date may refer to incorrect records that are not tied with records in dag_run table. The tables have following constrains in DDL:

CREATE TABLE task_instance(
    task_id character varying(250) NOT NULL,
    dag_id character varying(250) NOT NULL,
    run_id character varying(250) NOT NULL,
[...]
    PRIMARY KEY(task_id,dag_id,run_id,map_index),
    CONSTRAINT task_instance_dag_run_fkey FOREIGN key(dag_id) REFERENCES dag_run(dag_id),
    CONSTRAINT task_instance_dag_run_fkey FOREIGN key(run_id) REFERENCES dag_run(dag_id),
    CONSTRAINT task_instance_trigger_id_fkey FOREIGN key(trigger_id) REFERENCES trigger(id)
);

The problem is still active in version 2.4.1. The cloud composer version does not fix TaskReschedule cleaning and also break the xcom cleaning operation as execution_date is not a field in the db. Is there any update on the issue resolution?

Would

"airflow_db_model": XCom,
    "age_check_column": XCom.timestamp,

fixes the issue?

XCom.timestamp solves the error on XCom, not the others

XCom.timestamp solves the error on XCom, not the others

you can find the time column in model file. for example TaskInstance you can use queued_dttm/end_date

The problem is still active in version 2.4.1. The cloud composer version does not fix TaskReschedule cleaning and also break the xcom cleaning operation as execution_date is not a field in the db. Is there any update on the issue resolution?

Would

"airflow_db_model": XCom,
    "age_check_column": XCom.timestamp,

fixes the issue?

Fixes the issue for Xcom. For RenderedTaskInstanceFields use RenderedTaskInstanceFields.run_id to fix the issue.