Unable to use dynamic task mapping to export list of dataframes
tatiana opened this issue · 1 comments
tatiana commented
Describe the bug
Astro SDK raises an exception when trying to export a list of dataframes.
Version
- Astro: 1.5.3
To Reproduce
- Inside the
python-sdk
folder, run:
make local target=setup
source ~/.virtualenvs/astro-sdk/bin/activate
AIRFLOW_HOME=`pwd` airflow db init
- Write the following DAG to the file
calculate_popular_movies_by_genre.py
, inside theexample_dags
folder:
from datetime import datetime
import pandas as pd
from airflow import DAG
from astro import sql as aql
from astro.files import File
from astro.table import Table
from astro.sql.operators.export_to_file import ExportToFileOperator
@aql.dataframe(columns_names_capitalization="original")
def top_movies_by_genre(input_df: pd.DataFrame):
top_movies = []
for genre, genre_df in input_df.groupby("genre1"):
genre_df.sort_values(by="rating", ascending=False)[["title", "rating", "genre1"]].head(5)
top_movies.append(genre_df)
return top_movies
with DAG(
"calculate_popular_movies_by_genre",
schedule_interval=None,
start_date=datetime(2022, 1, 1),
catchup=False,
) as dag:
imdb_movies = aql.load_file(
File("https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb_v2.csv"),
output_table=Table(conn_id="sqlite_default"),
)
movies_by_genre = top_movies_by_genre(input_df=imdb_movies)
ExportToFileOperator.partial(output_file=File("/tmp/output.json"), task_id="export_movies").expand(input_data=movies_by_genre)
- Try to run the test:
AIRFLOW_HOME=`pwd` pytest 'tests_integration/test_example_dags.py::test_example_dag[calculate_popular_movies_by_genre]'
- See the error:
tests_integration/test_example_dags.py:31: in wrapper_run_dag
test_utils.run_dag(dag)
tests_integration/sql/operators/utils.py:69: in run_dag
return test_dag(dag=dag)
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/utils/session.py:75: in wrapper
return func(*args, session=session, **kwargs)
tests_integration/sql/operators/utils.py:142: in test_dag
_run_task(ti, session=session)
tests_integration/sql/operators/utils.py:190: in _run_task
ti._run_raw_task(session=session)
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/utils/session.py:72: in wrapper
return func(*args, **kwargs)
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/models/taskinstance.py:1383: in _run_raw_task
self._execute_task_with_callbacks(context, test_mode)
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/models/taskinstance.py:1502: in _execute_task_with_callbacks
task_orig = self.render_templates(context=context)
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/models/taskinstance.py:2123: in render_templates
original_task.render_template_fields(context)
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/models/mappedoperator.py:686: in render_template_fields
unmapped_task._do_render_template_fields(
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/utils/session.py:72: in wrapper
return func(*args, **kwargs)
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/models/abstractoperator.py:600: in _do_render_template_fields
if not value:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = x title rating totalvotes genre1 genre2 ... votes1000 votesus v...Action Adventure ... 7.3 7.8 7.7 $146,408,305 207215819 $353,624,124
[29 rows x 58 columns]
@final
def __nonzero__(self) -> NoReturn:
> raise ValueError(
f"The truth value of a {type(self).__name__} is ambiguous. "
"Use a.empty, a.bool(), a.item(), a.any() or a.all()."
)
E ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/pandas/core/generic.py:1527: ValueError
Expected behavior
The intended DAG should be run without errors.
Additional context
This seems to be an error on the upstream Airflow when using dynamic task mapping with a list of Pandas dataframes.
tatiana commented
This issue was happening in the upstream Airflow. We submitted a fix, which will be released as part of Airflow 2.6.1.
We added an example DAG which shows this use case, and it will be only run as part of our test suite for versions of Airflow larger than 2.6.1.