harrytandata/airflow_course

[Question] `dbt deps`命令安排在哪个阶段

Closed this issue · 1 comments

Harry 老师:

我在airflow中集成dbt的过程比较顺利,但在关于dbt deps命令的安排上有些困惑。
我的airflow部署在k8s中,采用deploydags分离的策略,无论是web, scheduler, 还是worder,都通过gitDAG仓库定时同步到容器中,dags目录挂载模式是emptyDir。与你的airflow_course类似,我的dagsdbt project在同一个仓库中,方便开发。

下面是我的自定义Operator代码片段:

@cached(cache=TTLCache(maxsize=1, ttl=60))
def get_finance_dbt_env_task()-> dict[str, str]:
    ...
class FinanceDbtOperator(BashOperator):
    def __init__(self, task_id: str, bash_command: str, **kwargs):
        _dbt_env = get_finance_dbt_env_task()
        kwargs['cwd'] = _dbt_env['DBT_PROJECT_DIR']
        kwargs['env'] = _dbt_env
        kwargs['append_env'] = True
        if 'queue' not in kwargs:
            kwargs['queue'] = 'kubernetes'
        super().__init__(task_id = task_id, bash_command = bash_command, **kwargs)

调用时仅需:

_dbt_task = FinanceDbtOperator(
    task_id='dbt_run',
    bash_command="dbt run --models 'dw.dim_stock'",
)

每运行一个task实例,都对应启动一个pod,拉取到干净的dbt项目文件后,都需要运行一次dbt deps以安装项目必要的包。
我实践或思考了一些方法:

方案 优点 缺点
FinanceDbtOperator调整为'dbt deps\n' + bash_command 省事儿 每次运行任务都自动重复一遍,产生大量spin up阶段的消耗
将包含dagsdbt项目的dags目录持久化 资源消耗低 仍然难以判断应何时运行dbt depspackages.yml是否发生更改)。反复运行dbt deps时,由于挂载了持久化卷,所以资源占用不大,但仍然需要花费数秒时间
dbt deps移动到airflow deploy阶段 资源消耗低 dags仓库中的依赖项变更需要运维人员的干预,且难以应对多个dbt项目
dbt项目构建独立的镜像 运行时资源消耗低,且能够应对多个dbt项目的依赖冲突 需要构建复杂的CI/CD管道,让airflow中的DAGdbt项目的镜像协同运作
dbt_packages.gitignore中移除,所有依赖包都纳入代码仓库 资源消耗低,容易协同 很脏的一种做法

想要了解,在你们的airflowdbt集成的实践中,是如何安排dbt deps的执行时机的呢?

经过探索,目前已经找到了问题症结。
我们认为使用一体化仓库共同开发并在airflow中直接调bash命令不是一个好主意。
我们将airflow的dags仓库与dbt项目分开,在dbt项目的cicd中构建镜像并部署文档,然后在airflow的dags项目中使用KubernetesPodOperator调用dbt项目,代码大致如下:

'''
finance dbt project
'''
from logging import getLogger
from venv import logger
from cachetools import TTLCache, cached
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator

logger = getLogger(__name__)

FINANCE_DBT_IMAGE_TAG='0.0.1'

@cached(cache=TTLCache(maxsize=1, ttl=60))
def get_finance_dbt_envs() -> dict[str, str]:
    logger.info('requesting dbt env...')
    dbt_target = Variable.get("DEPLOYMENT")
    ck_http_connection = BaseHook.get_connection("CLICKHOUSE_FINANCE_HTTP")
    return {
        "FINANCE_DBT_TARGET": str(dbt_target),
        "FINANCE_CLICKHOUSE_HOST": str(ck_http_connection.host),
        "FINANCE_CLICKHOUSE_HTTP_PORT": str(ck_http_connection.port),
        "FINANCE_CLICKHOUSE_DATABASE": str(ck_http_connection.schema),
        "FINANCE_CLICKHOUSE_USER": str(ck_http_connection.login),
        "FINANCE_CLICKHOUSE_PASSWORD": str(ck_http_connection.password),
    }


class FinanceDbtOperator(KubernetesPodOperator):
    '''
    dbt operator for finance project

    env: a dict, keys: FINANCE_DBT_TARGET, FINANCE_CLICKHOUSE_HOST, FINANCE_CLICKHOUSE_HTTP_PORT, FINANCE_CLICKHOUSE_DATABASE, FINANCE_CLICKHOUSE_USER, FINANCE_CLICKHOUSE_PASSWORD

    '''

    def __init__(self, task_id: str, cmds: list[str], arguments: list[str] | None = None, **kwargs):
        _dbt_envs = get_finance_dbt_envs()
        super().__init__(
            task_id=task_id,
            image=f"docker-registry.mujiannan.com:5001/mujiannan/finance-dbt:{FINANCE_DBT_IMAGE_TAG}",
            cmds=cmds,
            arguments=arguments,
            env_vars=_dbt_envs, 
            **kwargs
        )