/datapipelines-airflow

This is a repo that contains the DAGs to process the Sparkify data to a Redshift database

Primary LanguagePythonMIT LicenseMIT

Datapipelines-Airflow

This is a repo that contains the DAGs to process the Sparkify data to a Redshift database

Project structure

The project has been created with the following structure:

├── LICENSE
├── README.md
├── create_tables.sql
├── dags
│   ├── load_dimension_subdag.py
│   └── sparkify_etl.py
├── docker-compose.yml
├── docker-compose2.yml
├── dwh_template.cfg
├── iac_redshift.py
├── plugins
│   ├── __init__.py
│   ├── helpers
│   │   ├── __init__.py
│   │   └── sql_queries.py
│   └── operators
│       ├── __init__.py
│       ├── data_quality.py
│       ├── load_dimension.py
│       ├── load_fact.py
│       └── stage_redshift.py
└── requirements.txt
  • dwh_template.cfg: Template for the configuration file. Fill in the missing information and rename the file to dwh.cfg
  • docker-compose.yml: Docker compose file for Airflow 1.10.15.
  • docker-compose2.yml: Docker compose file for Airflow 2.2.3.
  • iac_redshift.py: Python utility that creates and deletes an AWS Redshift Cluster (IaC).
  • requirements.txt: requirements for python env.
  • create_tables.sql: Sql queries to create the tables inside the Redshift Cluster
  • dags/load_dimension_subdag.py: Contains the subDAG that loads dimensions and checks quality
  • dags/sparkify_etl.py: Contains the main DAG that loads the Sparkify data to REedshift
  • plugins/helpers/sql_queries.py: Contains the SQL queries to get the dimensions data
  • plugins/operators/*.py: Contains the 4 operators used int the DAG

Usage

Before running the Airtflow DAG make sure to create the Redshift cluster and create the tables with the SQL commands provided in the create_tables.sql script.

After that, copy the files into an Airflow deployment and run the DAG from the Airflow UI.

Cluster administration

Create the cluster

python iac_redshift.py --create

After a while check if the cluster is available by running the status command

python iac_redshift.py --status

This will return a table and if the status is available it will also return the cluster address and the IAM role. Please fill in these values into the configuration file. CLUSTER-HOST and IAM_ROLE-NAME respectively.

The cluster can be deleted running to avoid innecessary costs while developing:

python iac_redshift.py --delete

⚠️ This will delete the cluster and all the information with it. After this point all data will be lost and both the creation and uploading scripts will be needed to recreate the cluster.

Table creation

The file create_tables.sql contains the SQL instructions to create the tables. Run the

  • stagingEvents : Staging table that maps the S3 Log data into the cluster.
  • stagingSongs: Staging table that maps the S3 Song data into the cluster.

And the Schema for OLAP.

  • songplays
  • users
  • songs
  • artists
  • time

Airflow with Docker

This process requires Airflow to run. For local development and testing the reader can ry and follow this tutorial to set up Airflow with Docker or check this from Apache Airflow itself. For Airflow version 1.10.15 check this repo

To run the container

docker-compose up

Trouble shooting

Sometimes if the volumes are not deleted before the container is run this error may occur

service "airflow-init" didn't completed successfully: exit 1

Run and try again

docker-compose down --volumes --rmi all

Permissions:

VS Code: NoPermissions (FileSystemError): Error: EACCES: permission denied

Solved

sudo chown -R username path 

Reviewing loading errors in Redshift

SELECT filename, starttime, colname, type, raw_line, col_length, err_code, err_reason FROM stl_load_errors ORDER BY starttime DESC;