Author: Rodrigo de Alvarenga Mattos
The intent of this project is to build an automated ETL pipeline using Apache Airflow. The data is a numerous set of small JSON files, stored in AWS S3, that must be loaded into AWS Redshift according to the star schema described in the Table Schema section. The pipeline was built from reusable tasks and the data quality check can be easily extended to connect with any database.
The Sphinx documentation generator was used to build the HTML docs from the source code DOCSTRIGS
.
- The Pipenv package manager was used to install and manage dependencies.
# install pipenv for dependency management
pip install pipenv
# install project dependencies from Pipfile
pipenv install
- You can also use pip to install dependencies from the requirements.txt file.
pip install -r requirements.txt
- Install Apache Airflow using docker compose:
make docker
This is the list of services that have been provisioned in the AWS cloud:
Service | Resources | Description |
---|---|---|
IAM | Policy | Redshift policy. |
IAM | Role and User | Redshift role and user. |
S3 | Data Lake Bucket | Storage data source. |
VPC | Subnet Group and VPN Gateway | Redshift virtual networking environment. |
VPC | Security Group | Redshift inbound and outbound traffic. |
Redshift | EC2 instances. | Customized cloud data warehouse. |
We used Terraform to automate infrastructure provisioning, including servers, network, permissions, and security. Please follow the instructions below before running Terraform commands:
- Check if you have terraform installed or follow the instructions in the website:
terraform version
- Make sure you have the AWS Command Line Interface installed, the user is logged in and the default region is set:
# check the current user
aws iam get-user
# the default region should be set
aws configure get region
- Initialize the Terraform working directory containing configuration files:
make init
- Apply Terraform configuration to provision AWS services and resources:
make apply
Before running the ETL pipeline, you must log in to the Airflow UI to set up the AWS credentials and the Redshift connection.
- Open your browser, navigate to the Airflow UI on http://localhost:8080, and sign in with the following credentials:
username | password |
---|---|
airflow | airflow |
- Navigate to the top main menu section Admin and click connections. Click the plus [+] button to add a new record according to the following settings:
Option | Value |
---|---|
Connection Id | aws_credentials |
Connection Type | Amazon Web Services |
AWS Access Key ID | A valid aws_access_key_id stored on your ~/.aws/credentials file |
AWS Secret Access Key | A valid aws_secret_access_key stored on your ~/.aws/credentials file |
Extra | { "region_name": "us-west-2" } |
- Click the plus [+] button to add a new record according to the following settings:
Option | Value |
---|---|
Connection Id | redshift |
Connection Type | Amazon Redshift |
Host | Use the host name, without the port section, exported by terraform to the ./config/redshift.cfg |
Schema | dwh |
Login | dwhuser |
Password | Use the password set in the ./terraform/secret.tfvars |
Port | 5439 |
We've created a set of custom operators, for each step of the ETL pipeline, and defined their dependencies within the Airflow DAG as shown in the graph below:
The following table describes each process related to the ETL pipeline:
Step | Action |
---|---|
Stage to Redshift | Copy JSON data from S3 and load into stage tables on Redshift. |
Load Fact Table | Transform and load data from stage tables to the fact table. |
Load Dimension Tables | Transform and load data from stage tables to the dimension tables. |
Data Quality Check | Validade every table column in fact and dimension tables. |
The figure below shows how the JSON raw data was loaded into the stage tables and their schema.
The Redshift database is modeled as a star schema that consists of a fact table (songplays) referencing four dimension tables (artists, songs, time, users).
The figure below shows the database structure as an entity relationship diagram:
The data quality check was declared in a JSON file ./dags/quality_check.json as an array of validation rules for each tables' column of the database schema as follows:
Column Rule Type | Action |
---|---|
regexp | Get the number of rows that match the regular expression |
timestamp | Get the number of rows that have a timestamp value |
nullable | Get the number of rows with null values |
any | Get the number of rows since any value is valid |
Sample quality check rules for the artist table:
-
Before triggering the DAG, update the start date (year, month, day) in the pipeline configuration file ./dags/pipeline.cfg and set with the current date.
-
Go to the Airflow UI home page http://localhost:8080/home and click on the Trigger Dag action button [>] on the right side of the DAG list panel.
-
Follow the DAG execution using the Grid and the Graph view.
-
At the end of a successful run, go to the Admin section of the main menu and click on XCom. The quality_check output will be stored as a JSON string with the following content: