/airflow-mini-project1

Utilize Docker and Apache Airflow to orchestrate the pipeline, exercise the DAG creation, uses of various operators (BashOperator, PythonOperator, etc), setting up order of operation of each task.

Primary LanguagePython

Airflow project!

Screen Shot 2021-12-17 at 12 29 41 AM

Project Objectives:

  • Incorporate Docker in order to solve the sharing and reproducing challenges (different operating systems, versioning, lack of process).
  • Use Apache Airflow to orchestrate the pipeline.
  • Exercise DAG creation.
  • Use Various Airflow operators like BashOperator, PythonOpertor and FileSensor.
  • Set up the order operation of each task.

NOTE: there are some redundant tasks within the Airflow dag that could be compressed into fewer tasks. For example, task_1 and task_3 can be coded as one task. However, with the purpose of illustrating the usage of different Airflow operators and the task order operation, I will keep the redundant tasks as the way they are.

In this project, I created a data pipeline to extract online stock market data and deliver future analytical results. Yahoo Finance is used as the data source via the yfinance python library.

the source data follow this schema:

Columns Type
Datetime STRING
Open DECIMAL
High DECIMAL (highest price within the time interval)
Low DECIMAL (lowest price within the time interval)
Close DECIMAL (the last price of the time interval)
Adj Close DECIMAL
Volume DECIMAL

We'll work with 2 stocks symbols: AAPL and TSLA. The workflow can be seen in data_pipeline.py which is scheduled to run at 6pm on every weekday (Mon - Fri) with the below functions:

  • Download the daily price data with one minute interval for the two symbols. Each symbol will have a separate task, Task 1 (task_1) and Task 2 (task_2), which run independently and in parallel.
  • Sensing files (sensing_task_1 & sensing_task_2) will check the existence of TSLA_data.csv and AAPL_data.csv before executing the next tasks.
  • Save both datasets into CSV files and load them into a directory. Each symbol will have a separate task, Task 3 (task_3) and Task 4 (task_4), which run independently and in parallel.
  • Run your custom query on the downloaded dataset for both symbols, Task 5 (task_5). Before this step executes, all previous tasks must complete.

Screen Shot 2021-12-17 at 12 29 28 AM

All the tasks should be successfully executed like this:

Screen Shot 2021-12-17 at 11 19 49 PM

We can check all the tasks log in this folder.

Example of task 5's successful execution log file which gives us a list as an output (checking get_last_stock_spread.py for your reference):

Screen Shot 2021-12-17 at 12 07 35 PM

Steps to follow to execute all files:

1, From Bash shell, execute ./start.sh and it will build and start all the services.

2, Wait untill all the services are completely executed and they should be healthy as screenshot below:

Screen Shot 2021-12-17 at 12 31 13 AM

3, Go to localhost:8080 to access the web ui.

4, Login with username: airflow and password: airflow.

5, Now we can wait for all the tasks running as the schedule or manually trigger the tasks to see the output.

6, After completing all the tasks, we can execute ./stop.sh to stop the services.

7, Execute ./reset.sh to wipe out all the images.

Log Analyzer

log_analyzer.py is created to monitor all the error messages within the log files by running with below command line as example:

$airflow %  python3 log_analyzer.py /Volumes/Moon/SpringBoard/Airflow_MiniProject1/mnt/airflow/logs

It should give us this output

Screen Shot 2021-12-18 at 7 37 54 PM

Future works:

  • Integrate CeleryExecutor in the airflow.cfg and adjust docker-compose.yml configs accordingly.
  • The execution of the airflow service in the docker should be utilized since it still takes quiet longer time to be successfully implemented.