Extract song play activity from logs stored on AWS S3 and transform them into a star schema consisting of a central fact table (songplay) and related dimension tables (song, artist, user, time) using PySpark running on AWS EMR.
Sparkify -- a fictitious startup -- wants to analyze the data they have been collecting on songs and user activity form their new music streaming app. They are particularly interested in finding out what songs are user's are listening to. Their data is stored JSON logs files and needs to be analyzed in order to find out. They want to create a database optimized to analyze user's listening behavior. To perform this analysis routinely they need a database schema and an extract-transform-and-load (ETL) pipeline.
What songs are users's listening to? To answer this question I will need to create an extract-transform-and-load pipeline to restructure the Sparkify data.
Log files are gathered from subscriber activities by Sparkify's online transactional processing (OLTP) system and written to AWS S3 storage. Star schemas are commonly used in data warehouses applications to support online analytical processing systems (OLAP) needs. PySpark code running a AWS EMR cluster is used to transform the raw JSON logs into star schemas simplifying analytic queries.
-
etl.py
-- Extracts data from the raw logs stored on AWS S3, transforms them using PySpark code and loads the resulting fact and dimension tables back onto AWS S3 for analysis. -
dl.cfg
-- Configuratio file containing paths song data, event logs and the output directory where to write the normalized star-schema tables. Access credentials for AWS are also stored here so don't store this file on Github. -
data
-- Sample data for running locally with subdirectories forsong_data
andlog_data
.
Command line interface enables this program to be run locally or on AWS. Paths to the top-level data files must first be entered in .env/dl.cfg file. This file is not part of the repository because it contains AWS keys. Here is an example for you to follow:
[AWS]
AWS_ACCESS_KEY_ID=your-key-here
AWS_SECRET_ACCESS_KEY=your-secret-key-here
SONG_DATA=s3a://udacity-dend/song_data/
LOG_DATA=s3a://udacity-dend/log_data/
OUTPUT_DATA=s3://song-play-spark/
[LOCAL]
SONG_DATA=data/interim/song_data/
LOG_DATA=data/interim/log_data/
OUTPUT_DATA=data/interim/processed/
-
Locally -- Enter
python etl.py --local
-
AWS EMR -- Enter
python etl.py --aws
-
PySpark SQL Module Documentation -- Essential documentation for understanding the PySpark API.
-
Removing duplicate columns after a DF join in Spark - Solution to duplicate column name resulting from a join is to either:
- join using a list
df1.join(df2, ['id']).show()
joining on the commonid
column - leave the columns named indendically and provide an alias for each table:
df1.alias("a").join(df2.alias("b"), df1['id'] == df2['id']).select("a.id", "a.val1", "b.val2").show()
- join using a list
-
Apache PySpark by Example -- An excellent and short introduction to using PySpark for data analysis. Available for free through most public libraries.
-
Spark Python API Docs - Excellent documentation of the API and examples of how to use it.