/airfllow-demo

Short hands-on guide to show some airflow features.

Primary LanguagePython

Apache Airflow - POC | Orchestration made easy | Hans-on Guide |

Overview

This repository is created to show main apache-airflow features, use cases and advantages of using this library, from installation, overview and some code examples as POC.

Disclaimer

Here we don't pretend to give a deep explanation about all the components and services of apache airflow, this is a POC repository which is created to guide people without previous knowledge on apache-airflow by providing concrete definitions for the main components, installation instructions and coded examples to show some basics of this library.

Feel free to contribute with more examples.

What is apache-airflow?

Python library to define, manage and monitor workflows.

What is a workflow?

A set of defined tasks with an specific order (dependencies).

Relevant Terminology

  • DAG: Set of task (Operators)
  • Operator: A specific task
  • Schedule interval: Frequency

What kind of "process" can be automated by Airflow?

All processes that does not depends on manual (intermediate) actions/decisions/confirmations and technically possible to be triggered/performed by bash commands, python functions, db processes.... or any other integration/operator (aws, s3, emr, S3ToGoogleCloudStorageOperator!!!, azure, gcp, hive, BigQuery, SSH tunnel... and a long list of etc).

Kiku

Enough theory...

How to install airflow?

Prerequisites:

  • python
  • spark (this is not required for airflow installation, but for the examples that executed spark jobs).

Dependencies installation steps

  1. Activate your virtual environment (if it's not created yet, create one based on python 3.7).

    source {virtual_environment_folder}/bin/activate
  2. Set following env variable to allow virtual environment install apache-airflow

    export SLUGIFY_USES_TEXT_UNIDECODE=yes
  3. Install dependencies in your virtual environment (see notes about py3 issues with airflow 12 requirements.txt)

    pip install -r requirements.txt

Let's start running our airflow instance

In order to run airflow, we only need to follow next few steps...

Below instructions are considering you are at root project folder

  1. Set $AIRFLOW_HOME environment variable

    export AIRFLOW_HOME=./airflow
  2. Initialize airflow db (after run following command, you'll see more files inside airflow/ directory, which is now, our $AIRFLOW_HOME location - defined in previous step).

    airflow initdb
  3. Open a new session in your terminal to have 2 to work on the following steps.

  4. For the new session, repeat step 2.

  5. In session 1 - Start airflow scheduler

    airflow scheduler
  6. In session 2 - Start airflow webserver (Note: you can use a different port if 8084 is used by other service)

     airflow webserver -p 8084
  7. Open web browser and navigate to: http://localhost:8084

Alt Text

Main airflow components/commands

We've ran airflow scheduler, airflow webserver and airflow initdb... but what do they mean? ...what are they executing?

Well, to cover them quickly we can summarize:

  • scheduler is a monitoring process for all the dags and their tasks (monitoring status, dependencies, etc) running based on airflow.cfg.
  • webserver it provides a UI to manage our DAGs, it also monitors dag bag and "promotes" new dags created by users.
  • initdb airflow stores metadata, dag history, variables, connections and general configuration in a database, if not configured, by default it manages in local. initdb will initialize our airflow instance (it can restart it as well for an existing instance).

Talking about airflow.cfg...

Here is the place where we can configure some settings for our airflow instance, such as:

  • Loading or not dag examples (included by library)
  • scheduler heartbeat interval
  • smtp config for email operators

For example...

[smtp]
smtp_host = your.host.com
smtp_starttls = True
smtp_ssl = False
smtp_user = your_email
smtp_password = given_psw
smtp_port = 587
smtp_mail_from = email_address

Ok, let's explore airflow webserver....

Practice - Set Operator Dependencies

Now... let's work with airflow/dags/abc_dag.py and try to set dependencies for poc_abc_dag as per first screenshot... done? now create 3 more DummyOperators (operator_x, operator_y and operator_z) all of them should be executed after bash_date and bash_command.

webserver - dashboard

Kiku

Summary of main columns:

  • second (left to right): switch to turn on/off your dag - only turned-on dags can be executed.
  • DAG: shown with the dag_id defined in the code
  • Recent tasks: last task execution count categorized for each possible state.
  • Last run: timestamp for most recent run on UTC.
  • DAG runs: historical counts for dag execution by possible status (success, failed, running).
  • Links: Here you can run manually your dag (if it's tuned-on), navigate to logs, delete, etc...

webserver - admin - connections

Kiku

This is, as you might image, the place to setup information regarding db connections, but also ssh, spark, etc...

webserver - admin - variables

Kiku

These values that you define either on webserver or airflow-cli are accessible in the code. If you want to add a "layer" of security, variables with substrings like apikey, secret, etc are automatically masked on webserver.

webserver - dags - rendered template

Kiku

Do you want to visualize how your command will look like? Well, this is basically a preview....

webserver - dags - task instance details

Kiku

All relevant (and not too relevant) metadata, if you ran your dag but it's not triggered yet, here you can find the reason...

Main Airflow Features

Kiku

  • Pure Python
  • Useful UI
  • Robust integrations
  • Easy to use
  • Open Source

Integrations... a lot of them!

Dont' trust? Take a look: https://airflow.apache.org/

Now, take a look into resources/pyspark/read_random_words.py, try to run first it (from terminal as standalone app) and then run airflow/dags/word_count_spark_dag.py in airflow.

Cloud Integrations

From: https://airflow.apache.org/docs/stable/integration.html#integration

  • Azure: Microsoft Azure
  • AWS: Amazon Web Services
  • Databricks
  • GCP: Google Cloud Platform
  • Qubole

Practice - POC - Data pipeline

Summary

This POC try to simulate a small data-pipeline which will trigger spark jobs via spark-submit.

Problem

Given two data sets with information of shows on Netflix and Prime (links on the last session), we want to get a single report with below data:

  • top 10 shows (based on IMDB Rating) for Netflix datasource.
  • top 15 shows (based on IMDB Rating) for Netflix datasource.

System should be able to notify in case of success/failed executions.

Bonus: handle those notifications in a third-party service...

Input details

Netflix datasource available on: resources/pyspark/sources/tv-shows-netflix.csv.Preview:

Titles,Year,Rating,IMDB_Rating,Netflix
Breaking Bad,2008,18+,9.5,1
Game of Thrones,2011,18+,9.3,0
Rick and Morty,2013,18+,9.2,0

Prime datasource available on: resources/pyspark/sources/prime-TV-Shows-Data-set.csv. Preview:

S.no.,Name of the show,Year of release,No of seasons available,Language,Genre,IMDb rating,Age of viewers
1,Pataal Lok,2020,1,Hindi,Drama,7.5,18+
2,Upload,2020,1,English,Sci-fi comedy,8.1,16+
3,The Marvelous Mrs. Maisel,2017,3,English,"Drama, Comedy",8.7,16+
4,Four More Shots Please,2019,2,Hindi,"Drama, Comedy",5.3,18+

Considerations

1 - Below list contains the applications (pyspark) to be triggered:

  • top_netflix_shows_identification.py: extraction of the top 10 netflix shows. Expected input in csv. Expected output in json.
  • top_prime_shows_identification.py: extraction of the top 10 prime shows. Expected input in csv. Expected output in json.
  • shows_unification.py: will consume upstream-jobs output, unify in a common schema and write the output with the total 25 shows (as per problem statement). Expected input in json. Expected output in csv.

2 - Both datasets have different schemas

3 - prime-TV-Shows-Data-set.csv contains column names with spaces.

4 - top_netflix_shows_identification.py and top_prime_shows_identification.py are consuming single file as input, but generating output folder with multiple files (spark output), only *.json should be considered as input for downstream app.

5 - shows_unification.py should be triggered only if both upstream jobs are completed with a success status.

Apache-spark overview

Kiku

To understand better what is going to be executed, it's important to know what we're going to execute, in this case pyspark.

Take a look into the following files (all the process is documented in the logs):

  • top_netflix_shows_identification.py
  • top_prime_shows_identification.py
  • shows_unification.py

Solution: airflow/dags/extract_top_shows_dag.py

There is no better way to learn than: coding. Theory is good to know definitions and reasons about "how" the things work, modify pyspark file to generate a different report format, content, etc. modify also the dags to include automatic retries if they fail and include datadog feature (hint: you only need to define some variables) on extract_top_shows_dag.py


Resources: