astronomer/astro-sdk

Unable to use dynamic task mapping to export list of dataframes

tatiana opened this issue · 1 comments

Describe the bug
Astro SDK raises an exception when trying to export a list of dataframes.

Version

  • Astro: 1.5.3

To Reproduce

  1. Inside the python-sdk folder, run:
make local target=setup 
source ~/.virtualenvs/astro-sdk/bin/activate
AIRFLOW_HOME=`pwd` airflow db init 
  1. Write the following DAG to the file calculate_popular_movies_by_genre.py, inside the example_dags folder:
from datetime import datetime

import pandas as pd
from airflow import DAG

from astro import sql as aql
from astro.files import File
from astro.table import Table
from astro.sql.operators.export_to_file import ExportToFileOperator

@aql.dataframe(columns_names_capitalization="original")
def top_movies_by_genre(input_df: pd.DataFrame):
    top_movies = []
    for genre, genre_df in input_df.groupby("genre1"):
        genre_df.sort_values(by="rating", ascending=False)[["title", "rating", "genre1"]].head(5)
        top_movies.append(genre_df)
    return top_movies


with DAG(
    "calculate_popular_movies_by_genre",
    schedule_interval=None,
    start_date=datetime(2022, 1, 1),
    catchup=False,
) as dag:
    imdb_movies = aql.load_file(
        File("https://raw.githubusercontent.com/astronomer/astro-sdk/main/tests/data/imdb_v2.csv"),
        output_table=Table(conn_id="sqlite_default"),
    )
    movies_by_genre = top_movies_by_genre(input_df=imdb_movies)
    ExportToFileOperator.partial(output_file=File("/tmp/output.json"), task_id="export_movies").expand(input_data=movies_by_genre)
  1. Try to run the test:
AIRFLOW_HOME=`pwd` pytest 'tests_integration/test_example_dags.py::test_example_dag[calculate_popular_movies_by_genre]'
  1. See the error:
tests_integration/test_example_dags.py:31: in wrapper_run_dag
    test_utils.run_dag(dag)
tests_integration/sql/operators/utils.py:69: in run_dag
    return test_dag(dag=dag)
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/utils/session.py:75: in wrapper
    return func(*args, session=session, **kwargs)
tests_integration/sql/operators/utils.py:142: in test_dag
    _run_task(ti, session=session)
tests_integration/sql/operators/utils.py:190: in _run_task
    ti._run_raw_task(session=session)
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/utils/session.py:72: in wrapper
    return func(*args, **kwargs)
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/models/taskinstance.py:1383: in _run_raw_task
    self._execute_task_with_callbacks(context, test_mode)
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/models/taskinstance.py:1502: in _execute_task_with_callbacks
    task_orig = self.render_templates(context=context)
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/models/taskinstance.py:2123: in render_templates
    original_task.render_template_fields(context)
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/models/mappedoperator.py:686: in render_template_fields
    unmapped_task._do_render_template_fields(
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/utils/session.py:72: in wrapper
    return func(*args, **kwargs)
../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/airflow/models/abstractoperator.py:600: in _do_render_template_fields
    if not value:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self =        x                                       title  rating  totalvotes  genre1     genre2  ... votes1000  votesus  v...Action  Adventure  ...       7.3      7.8       7.7  $146,408,305    207215819    $353,624,124 

[29 rows x 58 columns]

    @final
    def __nonzero__(self) -> NoReturn:
>       raise ValueError(
            f"The truth value of a {type(self).__name__} is ambiguous. "
            "Use a.empty, a.bool(), a.item(), a.any() or a.all()."
        )
E       ValueError: The truth value of a DataFrame is ambiguous. Use a.empty, a.bool(), a.item(), a.any() or a.all().

../../../.virtualenvs/astro-sdk/lib/python3.9/site-packages/pandas/core/generic.py:1527: ValueError

Expected behavior
The intended DAG should be run without errors.

Additional context
This seems to be an error on the upstream Airflow when using dynamic task mapping with a list of Pandas dataframes.

This issue was happening in the upstream Airflow. We submitted a fix, which will be released as part of Airflow 2.6.1.

We added an example DAG which shows this use case, and it will be only run as part of our test suite for versions of Airflow larger than 2.6.1.