Sparkify Airflow Project

In this project, I create an ETL Pipeline using Airflow, that ingests and transforms data from S3 into AWS Redshift tables.

Two datasets are used:

  1. A song dataset, which is a subset of real data from the Million Song Dataset.
  2. A log dataset, which consists of log files in JSON format generated by this event simulator based on the songs in the dataset above.

Files are stored in S3 bucket s3://udacity-dend and are ingested into fact and dimension tables.

The final product of the pipeline consist of the following tables:

Fact Table

songplays - records in event data associated with song plays i.e. records with page NextSong
    playid, start_time, user_id, level, songid, artistid, sessionid, location, user_agent

Dimension Tables

users - users in the app
    userid, first_name, last_name, gender, level
songs - songs in music database
    songid, title, artist_id, year, duration
artists - artists in music database
    artistid, name, location, lattitude, longitude
time - timestamps of records in songplays broken down into specific units
    start_time, hour, day, week, month, year, weekday

Installation

Create the virtual environment and install the packages:

make install

Run Airflow:

sh start.sh

For the purpose of the sparkify pipeline, we assume here that an AWS Redshift cluster has already been created.

Setup Cryptographic Key

As per the airflow instruction, generate fernet key by running the following command in a python console:

from cryptography.fernet import Fernet
fernet_key= Fernet.generate_key()
print(fernet_key.decode()) # your fernet_key, keep it in secured place!

Copy the output into fernet_key in file ./fernet.key. Make sure not to commit this back to github.

Tables Initialization

Download Postico or similar Postgres client and create a connection with the Redshift cluster using its hostname, username, password, and port.

Run the create_tables.sql in Postico once before enabling the DAG. This will delete and recreate the tables used by the pipeline.

postico

Add Airflow Connections

For the purpose of this pipeline, connections aws_credentials and redshift must be created before running the DAG.

Go to localhost:8080 in your browser and click on the Admin tab and select Connections.

Under Connections, select Create.

On the create connection page, enter the following values:

  • Conn Id: Enter aws_credentials.
  • Conn Type: Enter Amazon Web Services.
  • Login: Enter your Access key ID from the IAM User credentials you downloaded earlier.
  • Password: Enter your Secret access key from the IAM User credentials you downloaded earlier.

connection-aws-credentials

Once you've entered these values, select Save and Add Another.

On the next create connection page, enter the following values:

  • Conn Id: Enter redshift.
  • Conn Type: Enter Postgres.
  • Host: Enter the endpoint of your Redshift cluster, excluding the port at the end.
  • Schema: Enter dev. This is the Redshift database you want to connect to.
  • Login: Enter the login name you created when launching your Redshift cluster.
  • Password: Enter the password you created when launching your Redshift cluster.
  • Port: Enter 5439.

Once you've entered these values, select Save.

connection redshift

Running the Pipeline

The DAG should appear under the name sparkify-etl. Once enabled, it will run daily for the period 11/01/2018 to 11/30/2018. To change the schedule, go to airflow/dags/sparkify-etl.py and modify the start_date and end_date arguments.

Ingesting the tables

The DAG will run hourly, ingesting the full song dataset and an hourly set of logs templated to match the execution date.

All dimension tables, except the users table, will use the TRUNCATE mode. They are always deleted before being populated again, as they rely on the song data which is fully loaded.

The users table however is populated using a subset of the log data covering only one hour. I have thus implemented an upsert stragy, first deleting records from the users table which are present in the staged dataset (using the userid primary key to join the two tables), and then inserting all the rows from the staged table. This ensure I always have the latest up-to-date information about users.

dag view graph

dag view tree