[Question] `dbt deps`命令安排在哪个阶段
Closed this issue · 1 comments
mujiannan commented
Harry 老师:
我在airflow
中集成dbt
的过程比较顺利,但在关于dbt deps
命令的安排上有些困惑。
我的airflow
部署在k8s中,采用deploy
与dags
分离的策略,无论是web
, scheduler
, 还是worder
,都通过git
将DAG
仓库定时同步到容器中,dags
目录挂载模式是emptyDir
。与你的airflow_course
类似,我的dags
与dbt project
在同一个仓库中,方便开发。
下面是我的自定义Operator
代码片段:
@cached(cache=TTLCache(maxsize=1, ttl=60))
def get_finance_dbt_env_task()-> dict[str, str]:
...
class FinanceDbtOperator(BashOperator):
def __init__(self, task_id: str, bash_command: str, **kwargs):
_dbt_env = get_finance_dbt_env_task()
kwargs['cwd'] = _dbt_env['DBT_PROJECT_DIR']
kwargs['env'] = _dbt_env
kwargs['append_env'] = True
if 'queue' not in kwargs:
kwargs['queue'] = 'kubernetes'
super().__init__(task_id = task_id, bash_command = bash_command, **kwargs)
调用时仅需:
_dbt_task = FinanceDbtOperator(
task_id='dbt_run',
bash_command="dbt run --models 'dw.dim_stock'",
)
每运行一个task
实例,都对应启动一个pod
,拉取到干净的dbt
项目文件后,都需要运行一次dbt deps
以安装项目必要的包。
我实践或思考了一些方法:
方案 | 优点 | 缺点 |
---|---|---|
在FinanceDbtOperator 调整为'dbt deps\n' + bash_command |
省事儿 | 每次运行任务都自动重复一遍,产生大量spin up 阶段的消耗 |
将包含dags 与dbt 项目的dags 目录持久化 |
资源消耗低 | 仍然难以判断应何时运行dbt deps (packages.yml 是否发生更改)。反复运行dbt deps 时,由于挂载了持久化卷,所以资源占用不大,但仍然需要花费数秒时间 |
将dbt deps 移动到airflow deploy 阶段 |
资源消耗低 | dags 仓库中的依赖项变更需要运维人员的干预,且难以应对多个dbt 项目 |
为dbt 项目构建独立的镜像 |
运行时资源消耗低,且能够应对多个dbt 项目的依赖冲突 |
需要构建复杂的CI/CD 管道,让airflow 中的DAG 与dbt 项目的镜像协同运作 |
将dbt_packages 从.gitignore 中移除,所有依赖包都纳入代码仓库 |
资源消耗低,容易协同 | 很脏的一种做法 |
想要了解,在你们的airflow
与dbt
集成的实践中,是如何安排dbt deps
的执行时机的呢?
mujiannan commented
经过探索,目前已经找到了问题症结。
我们认为使用一体化仓库共同开发并在airflow中直接调bash命令不是一个好主意。
我们将airflow的dags仓库与dbt项目分开,在dbt项目的cicd中构建镜像并部署文档,然后在airflow的dags项目中使用KubernetesPodOperator
调用dbt项目,代码大致如下:
'''
finance dbt project
'''
from logging import getLogger
from venv import logger
from cachetools import TTLCache, cached
from airflow.hooks.base import BaseHook
from airflow.models import Variable
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
logger = getLogger(__name__)
FINANCE_DBT_IMAGE_TAG='0.0.1'
@cached(cache=TTLCache(maxsize=1, ttl=60))
def get_finance_dbt_envs() -> dict[str, str]:
logger.info('requesting dbt env...')
dbt_target = Variable.get("DEPLOYMENT")
ck_http_connection = BaseHook.get_connection("CLICKHOUSE_FINANCE_HTTP")
return {
"FINANCE_DBT_TARGET": str(dbt_target),
"FINANCE_CLICKHOUSE_HOST": str(ck_http_connection.host),
"FINANCE_CLICKHOUSE_HTTP_PORT": str(ck_http_connection.port),
"FINANCE_CLICKHOUSE_DATABASE": str(ck_http_connection.schema),
"FINANCE_CLICKHOUSE_USER": str(ck_http_connection.login),
"FINANCE_CLICKHOUSE_PASSWORD": str(ck_http_connection.password),
}
class FinanceDbtOperator(KubernetesPodOperator):
'''
dbt operator for finance project
env: a dict, keys: FINANCE_DBT_TARGET, FINANCE_CLICKHOUSE_HOST, FINANCE_CLICKHOUSE_HTTP_PORT, FINANCE_CLICKHOUSE_DATABASE, FINANCE_CLICKHOUSE_USER, FINANCE_CLICKHOUSE_PASSWORD
'''
def __init__(self, task_id: str, cmds: list[str], arguments: list[str] | None = None, **kwargs):
_dbt_envs = get_finance_dbt_envs()
super().__init__(
task_id=task_id,
image=f"docker-registry.mujiannan.com:5001/mujiannan/finance-dbt:{FINANCE_DBT_IMAGE_TAG}",
cmds=cmds,
arguments=arguments,
env_vars=_dbt_envs,
**kwargs
)