astronomer/astronomer-cosmos

Create "standalone" test tasks even when they are not attached to models/seeds/snapshots

tatiana opened this issue · 2 comments

Context

On 8 March 2024, Stephen Tang posted in the #airflow-dbt Slack channel (link to the thread):

Is it possible to use Cosmos to create DAGs with tasks that are only tests? When I try to manipulate the select and exclude args in the render_config to achieve this, I'm not getting the results I expect.

Several people joined the discussion in the thread.

Some approaches that can be used, as of Cosmos 1.4:

  1. Render all test nodes as a single Airflow Task by using the correspondentDbtTestOperator depending on the desired execution mode (as an example, DbtTestLocalOperator if using the local execution mode)

  2. Render all test nodes as a single Airflow Task by using TestBehavior.AFTER_ALL, following the docs:

task_group = DbtTaskGroup(
    render_config=RenderConfig(
        test_behavior=TestBehavior.AFTER_ALL,
    )
)
  1. Render each test node as an individual Airflow task by customizing how test nodes are rendered using this Cosmos feature. An example of DAG that does this:
import os
from datetime import datetime
from pathlib import Path

from airflow.models.dag import DAG
from airflow.utils.task_group import TaskGroup

from cosmos import DbtDag, ProfileConfig, ProjectConfig, RenderConfig
from cosmos.airflow.graph import create_test_task_metadata, create_airflow_task
from cosmos.constants import DbtResourceType, ExecutionMode, TestIndirectSelection, TestBehavior
from cosmos.dbt.graph import DbtNode
from cosmos.profiles import PostgresUserPasswordProfileMapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=PostgresUserPasswordProfileMapping(
        conn_id="airflow_db",
        profile_args={"schema": "public"},
        disable_event_tracking=True,
    ),
)


def convert_test(dag: DAG, task_group: TaskGroup, node: DbtNode, **kwargs):
    """
    Return an instance of a desired operator to represent a dbt "source" node.
    """
    test_meta = create_test_task_metadata(
        f"{node.name}_run",
        execution_mode=ExecutionMode.LOCAL,
        test_indirect_selection=TestIndirectSelection.EAGER,
        task_args=kwargs["task_args"],
        node=node,
    )
    return create_airflow_task(test_meta, dag, task_group=task_group)

basic_cosmos_dag = DbtDag(
    # dbt/cosmos-specific parameters
    project_config=ProjectConfig(
        DBT_ROOT_PATH / "jaffle_shop",
    ),
    profile_config=profile_config,
    operator_args={
        "install_deps": True,  # install any necessary dependencies before running any dbt command
        "full_refresh": True,  # used only in dbt commands that support this flag
    },
    render_config=RenderConfig(
        select=["test_type:generic"],
        node_converters={DbtResourceType("test"): convert_test}
    ),
    # normal dag parameters
    schedule_interval="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
    dag_id="basic_cosmos_dag",
    default_args={"retries": 2},
)

And how it is rendered:
Screenshot 2024-05-14 at 00 24 07

Desired behavior

(Open for discussion!)

It's unclear what the best behavior by default would be. Do we want users to be able to render each test as an individual task in Airflow? Wouldn't this be overkill?

Should we render "standalone" tests by default, similar to the DAG image I shared, as part of TestBehaviour.AFTER_EACH so most tests are still run grouped, but tests that are "parentless" would be rendered individually?

Or would it make more sense to have a new TestBehaviour.STANDALONE, to be used explicitly by people who want individual tests—regardless of whether they have parents or not—to be rendered as stand-alone tasks outside of TaskGroups?

Depending on the desired approach, this would be a breaking change, so we may need to introduce a feature flag.

My company's use case would be to have something like TestBehaviour.STANDALONE , as we have a DAG that runs tests only once daily, for models that are built more frequently in a different DAG. We prefer individual tasks per test as that helps triage alerts more easily in Slack (since the logs are able to render (since they're not too long) and the task name tells us exactly what failed).

That's very helpful, thanks a a lot, @stephentang91 !