In the final project of the course my goal is to stich together most of the technologies I learned about in the different modules of the Data Engineering Nanodegree into a coherent data pipeline. The resulting data structure is not extremely complex in itself, instead I tried to focus on creating a realistic ETL workflow using Airflow. In hindsight this needed almost as much work as figuring out the actual data structure.
For the basis of the project I chose the famous NYC taxi trips dataset. This is a well known and widely used data source in data analysis and data science projects. Even if it's not a very novel dataset, from a data engineering perspective it is a great candidate for this project for several reasons:
- It is an extremely large dataset, using big data tools is essential
- Updated regularily (albeit with some latency), great for scheduled pipelines
- Can be merged with several other datasets, eg. geography, weather, etc.
For the project I assumed that all trips included in the source data belong to a single taxi company. The goal of the project is to setup a data warehouse on AWS that can provide the management of this taxi company with high level view and the possibility to drill down and gain deeper insights about the performance of the company.
Source: https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page
This is the base "transactional" dataset, containing millions of individual taxi trips. The individual files are availabile in a public S3 bucket.
Source: https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
This is supplementary data for identifying the zones where the individual trips started or ended. Ideal for adding a geographical dimension to the analysis.
Source: https://www.ncdc.noaa.gov/cdo-web/datasets#GHCND
This is a separate data source which adds weather information to the trip data. I requested the file for this project via website of NOAA.
- Storage: AWS S3
- As the main data already resides in S3 it made sense to put the other raw data files in S+
- Workflow management: Airflow
- DAGs are perfect for reasoning about ETL processes
- I also wanted to create a solution where bulk loads and (simulated) incremental updates can run
- Data Warehouse: Amazon Redshift
- Star schema (and derived OLAP cubes) assume relational data modelling
- Redshift provides distributed, columnar storage, which is great boost for large amount of data
- Supplementary tools: PyData ecosystem (eg. Jupyter, pandas, etc.)
The final data model is a (small) star schema where the fact table contains trips with the detailed fare amounts, while dimension tables contain start and end locations and daily weather information.
This schema is great for aggregating trip counts, revenues and passenger counts by geographic, date, and weather dimensions in ad-hoc queries or perodically updated OLAP cubes. I left out columns from the original data sources to streamline the data and focus on these dimensions and use cases.
Note: Throughout the project I used Yellow Taxi Trip Records subset of the NYC-TLC data, starting from 2020-01-01 until the last available datapoint at the time of writing this (2020-06-30). The ETL pipeline would work for the larger dataset (including green taxis and/or For-Hire Vehicle data) and wider timewindow (assuming there were no breaking changes in data structure).
With these filtering criteria the ETL pipeline results in these table sizes:
dev=# select count(*) from analytics.trip;
count
----------
23021983
dev=# select count(*) from analytics.weather;
count
-------
213
dev=# select count(*) from analytics.zone;
count
-------
265
- trip_id: unique identifier created by hashing multiple columns
- passenger_count: number of passengers in trip (can be missing)
- trip_distance: traveled distance in miles as reported by the taximeter
- trip_duration_sec: trip time in seconds
- pickup_date: date when trip started
- pickup_location_id: taxi zone where trip started
- dropoff_location_id: taxi zone where trip ended
- rate_code: final rate code in effect at the end of the trip
- payment_type: type of payment
- fare_amount, extra, mta_tax, tip_amount, tolls_amount, improvement_surcharge, congestion_surcharge: components ot total amount
- total_amount: total amount charged to passengers
- total_amount_check: TRUE if total amount components add up to
total_amount
, FALSE if not
- date: date of weather metrics measured at NYC Central Park weather station
- wind: average daily wind speed (meters per second)
- precip: daily precipitation in milimeters
- snow: daily snowfall in milimeters
- tmax/tmin: daily min/max temperature
- location_id: id of taxizone
- borough: borough of NYC
- zone: taxi zone - smallest unit of this table
- service_zone: larger unit of zone structure
I manage all operations in two Airflow pipelines, from launching a Redshift cluster to creating the final data outputs. Only exception is weather data preprocessing, which I did in a separate Python script.
The original csv file from NOAA contained daily observations from 134 weather stations in New York, New Jersey and Connecticut, from 2017-01-01 to 2020-07-31.
In order to simulate loading from different formats and to preprocess the datafile, I filtered the columns, split up the csv file into daily JSON files, and uploaded them to the S3 bucket in the following format: s3://dend-capstone-somi/weather/<YEAR-MONTH>/weather-<DATE>.json
(this is done in preprocess_weather_data.py
).
For the sake of practicing these technologies, I created tasks in an Airflow pipeline to set up a Redshift cluster and save connection details into Airflow for later use. This way I can restart my project from scratch just by updating the config file (if necessary) and running the pipeline (if the Redshift cluster is already up and running the related tasks succeed quickly).
- I created a separate DAG for staging, transforming and loading the data when the Redshift cluster is ready.
- The pipeline in its current form can be run on the first day of each month, which suits the monthly nature of the trip data files.
- Data from the input sources is staged in Redshift tables under
stage
schema: - Tables are freshly created for each DAG execution date in the form of
staging.tablename_YYYY-MM-DD
using Airflow templating, making sure that data from separate DAG runs don't spill into each other.
- Weather dataset is filtered for the selected weather station (NYC Central Park) right after data is inserted into staging table.
- Cleaning taxi trip dataset is filtered for multiple criteria:
- Zero distance trips are removed from staging tables right after data is imported from S3, because these are usually either duplicates or seem to be some sort of technical entries.
- Trips with
NULL
as vendorID are also removed from staging (these are considered erroneous entries for this projcet, further research might validate them) - Invalid trips are filtered out when inserting data to final tables. Trips are considered invalid if there is another record with the same pickup and dropoff times and locations, but with negative dollar amounts (these are most likely trips that were charged back or canceled for some reason).
- Zone data is inserted as is, but updated for every run, assuming NYC TLC maintains zone definitions.
- Data Quality is checked by two criteria, checked for all three data tables after staging finished:
- Unwanted
NULL
values for given columns, as set in the customCheckNullValuesOperator
- Uniqueness of rows by either the primary key (
analytics.zone
andanalytics.weather
) or a combination of columns (analytics.trip
), as set in the customCheckUniqueValuesOperator
- Unwanted
- Final tables live under the
analytics
schema - Insertion is done via
PostgresOperators
with SQL statements in separate files - Insert tasks only rely on data in staging tables, therefore work in batch or incremental (scheduled) DAG runs
analytics.zone
(dimension):- primary key is
location_id
which maps topickup_location_id
anddropoff_location_id
on the fact table - it is truncated and reloaded with each import (this is a very small table)
- this is a small table, therefore distribution style is
ALL
- primary key is
analytics.weather
(dimnension):- primary key is the
date
, which maps topickup_date
on the fact table - updated incrementally, new data is inserted and existing rows are updated with the incoming values
date
is the distribution key and sort key (this will require periodic repartitioning as new data comes in)
- primary key is the
analytics.trip
(the fact table)- primary key is a hash value constructed from pickup/dropoff timestamps and location id and passenger count (this is a makeshift solution as the data source does not contain unique identifiers)
- also updated incrementally, with new rows inserted and any existing rows completely updated with the incoming data
pickup_date
is the distribution key and sort key (this will require periodic repartitioning as new data comes in)
- The project can be run in Airflow by reading DAGs, plugins and configuration from
./airflow
and./config
.
.
├── README.md
├── airflow
│ ├── dags
│ │ ├── create_redshift_dag.py
│ │ ├── etl_dag.py
│ │ └── sql
│ │ ├── create_analytics_tables.sql
│ │ ├── create_stage_tables.sql
│ │ ├── insert_trip_data.sql
│ │ └── insert_weather_data.sql
│ └── plugins
│ ├── __init__.py
│ └── operators
│ ├── __init__.py
│ ├── check_null_operator.py
│ ├── check_unique_operator.py
│ ├── create_redshift_operator.py
│ ├── save_redshift_endpoint.py
│ ├── stage_trip_data.py
│ ├── stage_weather_data.py
│ └── stage_zone.py
├── config
│ └── redshift.cfg
├── preprocess_weather_data.py
└── requirements.txt
- Redshift Setup: In a real project creating Redshift cluster straight from Airflow wouldn't really make sense, but here this presented a great opportunity to practice Airflow and AWS infrastructure as code in the same exercise.
- Security: When setting up the infrastructure I mostly used default values and settings. In a real life project it would be crucial to ensure the security of the setup via proper configuration of VPCs, security groups, roles and users (both on AWS and on Redshift database/schema level).
- Data cleaning: Even though I did some preprocessing and quality checks on the data, I could do more on cleaning and validating the input data. In a real life project this would be crucial. Here my focus was to set up the infrastructure and pipelines, accepting some implied assumptions about the data quality.
- Custom operators: Custom operators for staging the data look very similar at first sight, even though they differ in some key elements (query structure, S3 key construction, etc.). In a larger setup it might be worth merging their functionality into a single staging operator and handle the logical differences inside this merged operator. I considered this as premature optimization for this project.
- Deploying pipelines: I ran Airflow on my laptop which is obviously not a viable solution in a real setup. Properly deploying Airflow in the cloud with the appropriate resources and architecture and making sure that pipelines can run smoothly could be a project in itself.
- OLAP Cube: The final data lends itself nicely for creating multidimensional OLAP cubes, eg. summing up the total fare amount by date periods (weeks or months), pickup and dropoff locations and probably even weather conditions by categorizing numeric weather variables.
- Utilizing Spark: If were to process the whole NYC TLC dataset including yellow and green taxis and other trips for the whole available date range, we would need substantial resources. This operation would only be feasible if we used a Spark cluster to read data from S3 and crunch it.
The ETL pipeline should run fine with 100x amount of data, and based on the usage stats probably the current Redshift cluster would also be able to hold this much data. The execution would be much slower, but as long as the processing remains monthly, this shouldn't be showstopper.
For even larger factors we would need:
- bigger storage on Redshift cluster with more nodes to more efficiently handle the load
- a larger Airflow setup with more workers for parallel task execution
- if the monthly files would get extraordinarily large, we would probably need Spark to crunch the numbers (and maybe write results back to S3 instead of holding them in Redshift)
Current setup runs monthly, due to the fact that NYC TLC trip data is accessible in montly format (and with a considerable lag in time). Nevertheless the pipeline could run on a daily basis with only small modifications, but this would be quite inefficient as the staging tasks would have to read monthly files during every run and just keep records for a single day. The rest of the pipeline should function the same as currently (zone data is fairly constant, weather data is already in daily format).
If that the trip data was in daily files, the switching from monthly to daily run would literally mean changing a few lines of code (eg. DAG schedule_interval
and the way manifest files are constructed from execution_date
in staging tasks).
In this case giving access to a "bare" Redshift datawarehouse for all the users wouldn't be enough from neither security, operations nor user experience standpoint. For serving 100+ users I would setup either a curated dashboard (Plotly Dash, Redash or similar) with the results of relevant queries (this needs maintenance) or install a "self-service" BI tool (eg. Tableau, Power BI) in front of the datawarehouse, and let users query data via these channels instead of poking around in the datawarehouse.