Этот документ описывает стандарты и лучшие практики разработки ETL-процессов в Apache Airflow. Соблюдение этих стандартов позволяет улучшить читаемость кода, упростить поддержку, минимизировать количество ошибок и повысить производительность ETL-процессов.
- Ваш локальный образ Airflow всегда должен быть идентичен тому который используется в Production среде компании. Используйте docker compose для этого
x-airflow-common:
&airflow-common2-python3.10}
image: registry.com/airflow/airflow:2.0.3
environment:
&airflow-common-env
- Используйте понятные и описательные названия DAG'ов, тасков, переменных и подключений.
- Следуйте стандартам PEP 8 для Python-кода.
- Структурируйте код проекта:
- Отдельные файлы для DAG'ов.
- Использование модулей для общих функций и операторов.
- Обеспечьте детализированные и понятные лог-сообщения для каждого шага.
- Используйте
import logging as log
log.info("Полезная информация")
- Избегайте излишней информации в логах.
-
Используйте
snake_case
для имен DAG'ов:dag_id = "data_ingestion_pipeline"
-
Файл который содержит в себе даг должен совпадат с названием дага. Пример: load_dwh.py содержит в себе
dag = DAG(
dag_id="load_dwh",
...
)
- Всегда задавайте
schedule_interval
иstart_date
. - Настройте
catchup=False
, если обработка прошлых данных не требуется. - Укажите
max_active_runs
для предотвращения переполнения задач. - Старайтесь использовать плавающий star_date:
from pendulum import yesterday
dag = DAG(
dag_id="download_table",
start_date=yesterday(tz="Europe/Moscow"),
)
- Всегда используйте tz(timezone) для start_date. Определите для себя в компании какую будете использовать. Для РФ я рекомендую:
yesterday(tz="Europe/Moscow")
- Директория ./dags не должна содержать в себе ничего друго кроме DAG'ов. Данное требование связано не только со структурированием, но так же для производительности.Любые файлы процессятся dag processor manager и он пытается найти там DAG'и в файлах.
- Используйте
retries
иretry_delay
для каждой задачи. - Ограничьте количество параллельных задач через
pool
.
- Использование
BashOperator
допускается использовать только в крайнем случае, когда нет других альтернатив. В других случаях его использование строго запрещено. BashOperator имеет большое количесто проблем при использовании в k8s, его сложно отлаживать и на разных средах он может вести себя по разному.
- Использование
PythonOperator
запрещено так как этот синтаксис устарел, вместо него используйте @task декоратор
from airflow.decorators import task
@task(dag=dag)
def clean_temp_folder():
Samba.clean_target_folder(TEMP_DIR)
log.info(f"Удалена временная директория {TEMP_DIR}")
- все Task операторы должны содержать в себе минимальное количество логики. Вся логика должна лежать в директории libs. Пример размещения логики для таска dwh_load
./libs/dwh_loader
Используйте переменную PYTHONPATH в вашем контейнере что бы python мог добраться до файлов с логикой
PYTHONPATH: /opt/airflow/libs
так же добавьте в settings.json:
{
"python.analysis.extraPaths": ["./libs"],
}
- Передавайте только малые объемы данных через XCom. Для больших данных используйте хранилища.
- Настройте оповещения о сбоях (email, Slack, Webhooks).
- Используйте
on_failure_callback
для кастомной обработки ошибок.
- Используйте
trigger_rule="all_failed"
илиtrigger_rule="one_failed"
для задач, требующих особой логики обработки.
- Все конфигурационные параметры (например, пути, API-ключи) должны храниться в Airflow Variables или Connections.
- Используйте
os.getenv
для управления переменными окружения.
from airflow.models import Variable
s3_bucket = Variable.get("s3_bucket_name")
- Запускайте DAG'и в локальной среде для проверки. Результаты работы необходимо приложить к вашему Pull Request
Все критические компоненты в директории libs должны содержать unit-тесты.
Используйте отдельную директорию ./tests
для этого.
- Настройте CI/CD с использованием таких инструментов, как
GitHub Actions
илиGitLab CI
. - Проверьте DAG'и на ошибки с помощью
pylint
и встроенных тестов Airflow.
- Именование PR должно подчиняться следующему паттерну:
JIRA-777 Название задачи
- PR должен пройти все unit тесты прежде чем вы его отправите на согласование
- В описании к PR должно содержаться:
- Кратко, что меняется
- Скриншоты успешного выполнения
- Для hotfix допускается не следовать данным правилам, но очень рекомендуется к использованию.
- Избегайте слишком большого количества задач в одном DAG.
- Разделяйте крупные DAG'и на подмножества с использованием
SubDagOperator
или зависимых DAG'ов.
- Настройте параметры
poke_interval
иtimeout
для Sensor'ов, чтобы избежать блокировки ресурсов.
- Используйте
task_group
для организации сложных DAG'ов. - Следите за корректностью зависимости: избегайте циклических ссылок между задачами.
Соблюдение этих стандартов позволяет ускорить разработку, упростить отладку и сделать процессы обработки данных более надёжными. Эти принципы должны использоваться как основа для разработки ETL в Airflow.