Dataset-aware scheduling with Astro-SDK doesn't work under Airflow 2.9.1
skolchin opened this issue · 0 comments
Hi, I was testing newest Airflow version 2.9.1 for compatibility with our project written using astro-sdk 1.8.0 and I found out that dataset-aware scheduling which worked fine under Airflow 2.8.4 just stopped working in new environment.
I wrote several small examples to illustrate that:
- A file is loaded to database thus triggering dataset change
- Table data gets copied to another table - again, triggering dataset change
- Modification of simple Airflow dataset
import pendulum
import pandas as pd
from airflow.models import DAG
from airflow.decorators import task
from airflow.datasets import Dataset
from astro import sql as aql
from astro.files import File
from astro.sql.table import Table, Metadata
types_table = Table(name='types', conn_id='source_db', metadata=Metadata(schema='stage'))
types_copy_table = Table(name='types_copy', conn_id='source_db', metadata=Metadata(schema='stage'))
dataset = Dataset("myscheme://myhost?table=mytable")
@task
def print_triggering_dataset_events(triggering_dataset_events=None):
""" Print out dataset trigger information """
for dataset, event_list in triggering_dataset_events.items():
print(f'Dataset: {dataset}')
print(f'Events: {event_list}')
with DAG(
dag_id='load_file',
start_date=pendulum.today().add(days=-1),
schedule='@daily',
catchup=False,
tags=['testing']
) as dag:
""" Load file into TYPES table. This will modify `types_table` dataset and trigger corresponding DAG """
aql.load_file(File(path='./dags/test.csv'), output_table=types_table)
with DAG(
dag_id='triggered_by_file_load',
start_date=pendulum.today().add(days=-1),
schedule=[types_table],
catchup=False,
tags=['testing']
) as dag:
""" This DAG is to be initiated by `types_table` dataset modifications """
print_triggering_dataset_events()
with DAG(
dag_id='copy-table',
start_date=pendulum.today().add(days=-1),
schedule='@daily',
catchup=False,
tags=['testing']
) as dag:
""" Load all data from TYPES table and save into new `TYPES_COPY` table.
This should modify `types_copy_table` dataset and trigger corresponding DAG """
@aql.run_raw_sql(results_format='pandas_dataframe')
def load_table(table: Table):
return '''select * from {{table}}'''
@aql.dataframe
def save_data(data: pd.DataFrame):
return data
data = load_table(types_table)
save_data(data, output_table=types_copy_table)
with DAG(
dag_id='triggered_by_copy_table',
start_date=pendulum.today().add(days=-1),
schedule=[types_copy_table],
catchup=False,
tags=['testing']
) as dag:
""" This DAG is to be initiated by `types_copy_table` dataset modifications """
print_triggering_dataset_events()
with DAG(
dag_id='dataset_triggerer',
start_date=pendulum.today().add(days=-1),
schedule='@daily',
catchup=False,
tags=['testing']
) as dag:
""" Simply trigger `dataset` dataset changes to run corresponding DAG """
@dag.task(outlets=[dataset])
def trigger_dataset_event():
print('Triggering event')
trigger_dataset_event()
with DAG(
dag_id='triggered_by_dataset',
start_date=pendulum.today().add(days=-1),
schedule=[dataset],
catchup=False,
tags=['testing']
) as dag:
""" This DAG is to be initiated by `dataset` dataset modifications """
print_triggering_dataset_events()
Under Airflow 2.8.4 everything works just fine - dependend DAGs start after dataset changes:
However, under Airflow 2.9.1 only the last pair of DAGs (which are using Airflow dataset) work as expected. Ones which rely on Astro-SDK tables are not triggered at all:
No code, obviously, gets changed, I just modify base image in Dockerfile used to build the environment (FROM apache/airflow:slim-2.8.4-python3.10
to FROM apache/airflow:slim-2.9.1-python3.10
).
I could not find any clue on this in Airflow logs.
Please help to solve this. Thanks!
Versions
- Astro-SDK: 1.8.0
- Airflow: [2.8.4, 2.9.1]
- Python: 3.10