The goal of this project is to set up an Amazon Redshift datawarehouse to store songplay event data and let us efficiently analyse it by different dimensions. This includes loading data from raw JSON format into staging tables and inserting rows to the fact and dimension tables while ensuring uniqueness and efficiency where necessary.
- Song dataset:
- 14,896 unique songs
- 9553 unique artists
- Log dataset:
- 8056 songplay events (ie.
page
= NextSong) - 94 unique
user_id
- 209 unique
song_id
- 194 unique
artist_id
- 8056 songplay events (ie.
-
Unnecessary data in song dataset: The song dataset contains way more songs and artists that actually appear in the log event dataset. To avoid oversized dimension tables I will only include songs and artists in the relevant tables that appear at least once in the songplay data.
-
Inconsistencies in artist data: Even though song dataset contains only 194 unique
artist_id
s,artist_name
might differ from song to song for the sameartist_id
, eg:
# artist_id, artist_name, title, song_id
('ARJGAUD1187FB505DA', 'Lange ft. Sarah Howells', 'Let It All Out', 'SONRTVW12AB017E5D1'),
('ARJGAUD1187FB505DA', 'Lange', 'Happiness Happening', 'SOPBKFW12A8C145018'),
('ARJGAUD1187FB505DA', 'Lan Ge', 'Hai Shang Liang Xiao (Album Version)', 'SOPFBQZ12A8C13B030')
In this case it's hard to choose the "right" artist name without any further research, so I followed a simplistic approach, and inserted the first instance for every artist in the artist
table.
Star schema where songplay
table functions as the central fact table. user
, artist
, song
and time
tables serve as dimension tables, each holding columns relevant to these entities.
Notes:
Even if the project brief contains level
and location
as columns on songplay
table, I left them out. level
is a column on user
table, and location
is in artist
table, so they are available by simple joins. This way our fact table can be as streamlined as possible without any loss of information.
The remaining columns on songplay
are mostly foreign keys of dimension tables (user_id
, song_id
, artist_id
), except for session_id
and user_agent
. session_id
might also be a foreign key to a hypothetical session
table which could contain session level information, including user_agent
.
Also it feels unnecessary to create the dimension table time
for extracting values from a timestamp. I would simply add a timestamp column to the fact table itself, and let the analytical queries make the extraction of date components based on their needs. This would further simplify the schema.
I have chosen the following optimization strategies for the final tables in the schema:
- songplay: EVEN distribution style with
start_time
as sorting key. - user: ALL distribution style
- artist: ALL distribution style
- song: ALL distribution style
- time: EVEN distribution style with
start_time
as sorting key.
Factors to consider in table design:
user
,artist
andsong
dimension tables are currently small enough to easily fit on all slices. This might change if lot of new data is inserted, but currently it will work.songplay
andtime
tables contain the same number of rows (and most likely will contain very similar number of rows in the future), and they are the largest tables. Currently they fit into all the slices, but in a more realistic scenario these would quickly outgrow a single slice, so I chose EVEN distribution withstart_time
as sorting key, so that similar values will most likely reside on the same slices.
.
├── README.md
├── create_tables.py
├── dwh.cfg
├── etl.py
├── manage_cluster.ipynb
├── requirements.txt
├── sql_queries.py
└── test.ipynb
You are reading this.
Script for creating the necessary tables (and dropping them first if they exist). Uses queries from sql_queries.py
.
Config variables for the datawarehouse and the ETL process. I added a few extra to the provided template to support creating the Redshift cluster from code.
Script for the ETL process. Run python create_tables.py
before running the ETL script. Uses queries from sql_queries.py
.
This notebook contains the necessary code for launching and tearing down a Redshift cluster. For running the code you need to have a user.cfg
config file with valid AWS access key and secret.
Dependencies for running the project.
This file contains the queries for creating staging and final tables, and also the insert statements for loading data from raw JSON files to the tables.
Notebook for prototyping and testing SQL queries.
Redshift does not support ON CONFLICT
clauses, therefore the upsert operations look a bit more complicated. I wrote queries that would ensure uniqueness during initial load and subsequent inserts.
We need to make sure that:
- all
userId
s instaging_event
are only inserted maximum once (if they are not already inusers
table in case of a new insert later) - after running the query all user rows will contain the latest value for
level
.
I created a temp table that contains the last event for every userId
in staging_event
, and used that in the insert statement and a separate update statement.
-- Get last songplay event for every userId in staging table
CREATE TEMP TABLE user_last_song AS
SELECT staging_event.*
FROM (
SELECT userId, max(ts) as ts
FROM staging_event
WHERE page = 'NextSong'
GROUP BY userId
) last_ts
LEFT JOIN staging_event USING (userId, ts)
WHERE page = 'NextSong';
-- Insert rows for userIds that are not already in users table
INSERT INTO users
SELECT
uls.userId as user_id,
uls.firstName as first_name,
uls.lastName as last_name,
uls.gender,
uls.level
FROM user_last_song uls
LEFT JOIN users ON users.user_id = uls.userId
WHERE users.user_id IS NULL;
-- Update existing users' level to latest value where it changed
UPDATE users
SET level = user_last_song.level
FROM user_last_song
WHERE
user_id = user_last_song.userId AND
users.level != user_last_song.level;
No upsert in this case, only making sure that:
- there is no duplication during later inserts,
- only those songs are included that are included in
songplay
Redshift does not enforce uniqueness on primary keys during inserts (on top of missing support for ON CONFLICT
clauses), so I joined the target table on the staging table and insert only those rows that are not present in the target table.
Note: This method implicitly asusmes two things:
staging_song
contains everysong_id
only once, so we don't need to ensure uniqueness there- song details of a specific
song_id
won't change later, so we don't have to do updates or flag duplicates
Given the nature of the data source I think these are reasonable assumptions.
INSERT INTO song
SELECT
song_id,
staging_song.title,
staging_song.artist_id,
staging_song.year,
staging_song.duration
FROM staging_song
LEFT JOIN song USING (song_id)
WHERE
song.title IS NULL AND
song_id IN (SELECT DISTINCT song_id FROM songplay);
As mentioned in the data summary, artist names are not fully consistent. To ensure uniqueness of artist_id
I decided to only include the first occurence of every artist in the songplay
data, by using row_number()
window function in the staging
subquery.
INSERT INTO artist
SELECT
artist_id,
staging.artist_name AS name,
staging.artist_location AS location,
staging.artist_latitude AS latitude,
staging.artist_longitude AS longitude
FROM (
SELECT *, row_number() OVER (PARTITION BY artist_id) as row_number
FROM staging_song
) staging
LEFT JOIN artist USING (artist_id)
WHERE
staging.row_number = 1 AND
artist.name IS NULL AND
staging.artist_id IN (SELECT DISTINCT artist_id FROM songplay);
The extra twist here is that we need to transform integer ts
into timestamps to be able to extract the necessary components. I solved this in the converted
CTE.
The insert query also ensures uniqueness of start_time
during later inserts, just as in the previous queries. This is somewhat overkill for this project as resolution of start_time
is millisecond, but in a truly large application multiple events can be recorded even in the same millisecond.
INSERT INTO time
WITH converted AS (
SELECT
ts,
'epoch'::timestamp + ts / 1000.0 * '1 second'::interval AS ts_converted
FROM staging_event
)
SELECT
ts AS start_time,
date_part('hour', ts_converted) AS hour,
date_part('day', ts_converted) AS day,
date_part('week', ts_converted) AS week,
date_part('month', ts_converted) AS month,
date_part('year', ts_converted) AS year,
date_part('dow', ts_converted) AS weekday
FROM converted
LEFT JOIN time ON time.start_time = converted.ts
WHERE time.start_time IS NULL;
- To set up Redshift cluster, run cells in
manage_cluster.ipynb
notebook. WARNING: the script will look for AWS credentials inusers.cfg
which is not included in the repo. - Create staging and final tables by running
python create_tables.py
- Run ETL pipeline by running
python etl.py
- I added my AWS user via an extra config file names
user.cfg
, which is not included in the repo to avoid sharing my credentials. This file is necessary to run commands inmanage_cluster.ipynb
. - I added a bunch of extra config paramteres to
dwh.cfg
. These are used in the scripts that create and delete the Redshift cluster. I also had to modify a bit howcreate_tables.py
andetl.py
reads in values for constructing the connection string.