astronomer/astronomer-cosmos

Using EMR transient cluster with Cosmos

gauravtanwar03 opened this issue · 1 comments

I want to use EMR transient cluster along with cosmos. As per my understanding, cosmos requires the host to available in airflow connection before hand and in case of transient cluster the host changes with every run. is there any existing workflow that i use to handle this?

To handle the dynamic nature of transient EMR clusters with Cosmos, you can programmatically update the Airflow connection within your DAG to reflect the new host information each time the cluster is created. Here’s a streamlined approach:

  1. Create or Update Airflow Connection Programmatically: Use Airflow's BaseHook and Connection classes to update the connection details dynamically within your DAG.

  2. Store Dynamic Host Information: Use Airflow Variables or Environment Variables to store the dynamic host information, which can be updated programmatically when the EMR cluster is created.

  3. Update Connection in DAG: Ensure the connection is updated before any tasks that depend on the EMR cluster are executed.

Here’s an example workflow:

import os
from datetime import datetime
from pathlib import Path

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
from airflow.hooks.base import BaseHook
from airflow.models import Connection

from cosmos import DbtTaskGroup, ExecutionConfig, ProfileConfig, ProjectConfig
from cosmos.constants import InvocationMode
from cosmos.profiles import get_automatic_profile_mapping

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))

execution_config = ExecutionConfig(invocation_mode=InvocationMode.DBT_RUNNER)

def update_airflow_connection(conn_id, host, port, login, password, schema):
    conn = Connection(
        conn_id=conn_id,
        conn_type='postgres',
        host=host,
        port=port,
        login=login,
        password=password,
        schema=schema
    )
    BaseHook.get_connection(conn_id).set_connection(conn)

@dag(
    schedule_interval="@daily",
    start_date=datetime(2023, 1, 1),
    catchup=False,
)
def cosmos_emr_workflow() -> None:
    dynamic_host = os.getenv("DYNAMIC_EMR_HOST")
    dynamic_port = os.getenv("DYNAMIC_EMR_PORT")
    dynamic_login = os.getenv("DYNAMIC_EMR_LOGIN")
    dynamic_password = os.getenv("DYNAMIC_EMR_PASSWORD")
    dynamic_schema = os.getenv("DYNAMIC_EMR_SCHEMA")

    update_airflow_connection(
        conn_id="emr_conn",
        host=dynamic_host,
        port=dynamic_port,
        login=dynamic_login,
        password=dynamic_password,
        schema=dynamic_schema
    )

    pre_dbt = EmptyOperator(task_id="pre_dbt")

    jaffle_shop = DbtTaskGroup(
        execution_config=execution_config,
        project_config=ProjectConfig(
            DBT_ROOT_PATH / "jaffle_shop",
        ),
        profile_config=ProfileConfig(
            profile_name="default",
            target_name="dev",
            profile_mapping=get_automatic_profile_mapping(
                conn_id="emr_conn",
                profile_args={"schema": dynamic_schema},
            ),
        ),
        operator_args={"install_deps": True},
        default_args={"retries": 2},
    )

    post_dbt = EmptyOperator(task_id="post_dbt")

    pre_dbt >> jaffle_shop >> post_dbt

cosmos_emr_workflow()

This approach ensures that the Airflow connection is always up-to-date with the latest host information for the transient EMR cluster, allowing Cosmos to interact with the cluster seamlessly.

Details

Sources


About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

To continue the conversation, mention @dosu.