/Airflow-development-standards

Стандарты разработки ETL в Airflow

Стандарты разработки ETL в Airflow

Введение

Этот документ описывает стандарты и лучшие практики разработки ETL-процессов в Apache Airflow. Соблюдение этих стандартов позволяет улучшить читаемость кода, упростить поддержку, минимизировать количество ошибок и повысить производительность ETL-процессов.


1. Общие принципы

1.1. Использование последней стабильной версии Airflow

  • Ваш локальный образ Airflow всегда должен быть идентичен тому который используется в Production среде компании. Используйте docker compose для этого
x-airflow-common:
  &airflow-common2-python3.10}
  image: registry.com/airflow/airflow:2.0.3
  environment:
    &airflow-common-env

1.2. Читаемость и структура кода

  • Используйте понятные и описательные названия DAG'ов, тасков, переменных и подключений.
  • Следуйте стандартам PEP 8 для Python-кода.
  • Структурируйте код проекта:
    • Отдельные файлы для DAG'ов.
    • Использование модулей для общих функций и операторов.

1.3. Логирование

  • Обеспечьте детализированные и понятные лог-сообщения для каждого шага.
  • Используйте
import logging as log
log.info("Полезная информация")
  • Избегайте излишней информации в логах.

2. Создание DAG'ов

2.1. Именование

  • Используйте snake_case для имен DAG'ов:

    dag_id = "data_ingestion_pipeline"
  • Файл который содержит в себе даг должен совпадат с названием дага. Пример: load_dwh.py содержит в себе

dag = DAG(
    dag_id="load_dwh",
    ...
)

2.2. Конфигурация DAG'ов

  • Всегда задавайте schedule_interval и start_date.
  • Настройте catchup=False, если обработка прошлых данных не требуется.
  • Укажите max_active_runs для предотвращения переполнения задач.
  • Старайтесь использовать плавающий star_date:
from pendulum import yesterday

dag = DAG(
    dag_id="download_table",
    start_date=yesterday(tz="Europe/Moscow"),
)
  • Всегда используйте tz(timezone) для start_date. Определите для себя в компании какую будете использовать. Для РФ я рекомендую:
yesterday(tz="Europe/Moscow")
  • Директория ./dags не должна содержать в себе ничего друго кроме DAG'ов. Данное требование связано не только со структурированием, но так же для производительности.Любые файлы процессятся dag processor manager и он пытается найти там DAG'и в файлах.

2.3. Настройки задач

  • Используйте retries и retry_delay для каждой задачи.
  • Ограничьте количество параллельных задач через pool.

3. Операторы и задачи

3.1. Использование BashOperator

  • Использование BashOperator допускается использовать только в крайнем случае, когда нет других альтернатив. В других случаях его использование строго запрещено. BashOperator имеет большое количесто проблем при использовании в k8s, его сложно отлаживать и на разных средах он может вести себя по разному.

3.2. Использование PythonOperator

  • Использование PythonOperator запрещено так как этот синтаксис устарел, вместо него используйте @task декоратор
from airflow.decorators import task

@task(dag=dag)
def clean_temp_folder():
    Samba.clean_target_folder(TEMP_DIR)
    log.info(f"Удалена временная директория {TEMP_DIR}")
  • все Task операторы должны содержать в себе минимальное количество логики. Вся логика должна лежать в директории libs. Пример размещения логики для таска dwh_load
./libs/dwh_loader

Используйте переменную PYTHONPATH в вашем контейнере что бы python мог добраться до файлов с логикой PYTHONPATH: /opt/airflow/libs

так же добавьте в settings.json:

{
    "python.analysis.extraPaths": ["./libs"],
}

3.3. Использование XCom

  • Передавайте только малые объемы данных через XCom. Для больших данных используйте хранилища.

4. Обработка ошибок

4.1. Мониторинг ошибок

  • Настройте оповещения о сбоях (email, Slack, Webhooks).
  • Используйте on_failure_callback для кастомной обработки ошибок.

4.2. Переопределение ошибок

  • Используйте trigger_rule="all_failed" или trigger_rule="one_failed" для задач, требующих особой логики обработки.

5. Параметризация и переменные

5.1. Использование переменных

  • Все конфигурационные параметры (например, пути, API-ключи) должны храниться в Airflow Variables или Connections.
  • Используйте os.getenv для управления переменными окружения.

Пример:

from airflow.models import Variable

s3_bucket = Variable.get("s3_bucket_name")

6. Тестирование и CI/CD

6.1. Локальное тестирование

  • Запускайте DAG'и в локальной среде для проверки. Результаты работы необходимо приложить к вашему Pull Request

6.2 Unit-тестирование

Все критические компоненты в директории libs должны содержать unit-тесты. Используйте отдельную директорию ./tests для этого.

6.3. Автоматизация развертывания

  • Настройте CI/CD с использованием таких инструментов, как GitHub Actions или GitLab CI.
  • Проверьте DAG'и на ошибки с помощью pylint и встроенных тестов Airflow.

Pull Request (Merge request, PR)

  • Именование PR должно подчиняться следующему паттерну: JIRA-777 Название задачи
  • PR должен пройти все unit тесты прежде чем вы его отправите на согласование
  • В описании к PR должно содержаться:
    • Кратко, что меняется
    • Скриншоты успешного выполнения
  • Для hotfix допускается не следовать данным правилам, но очень рекомендуется к использованию.

7. Производительность

7.1. Оптимизация DAG'ов

  • Избегайте слишком большого количества задач в одном DAG.
  • Разделяйте крупные DAG'и на подмножества с использованием SubDagOperator или зависимых DAG'ов.

7.2. Использование Sensor'ов

  • Настройте параметры poke_interval и timeout для Sensor'ов, чтобы избежать блокировки ресурсов.

8. Управление зависимостями

  • Используйте task_group для организации сложных DAG'ов.
  • Следите за корректностью зависимости: избегайте циклических ссылок между задачами.

Заключение

Соблюдение этих стандартов позволяет ускорить разработку, упростить отладку и сделать процессы обработки данных более надёжными. Эти принципы должны использоваться как основа для разработки ETL в Airflow.