For Vietnamese edition, please visit https://lelouvincx.github.io/projects/fde2-goodreads-elt-pipeline/
In this project, I will guide you on building a basic data pipeline using the ELT model (extract - load - transform), using the dataset from Goodreads to ingest and transform data to serve a book recommendation system for yourself.
This project is completed based on the knowledge learned from the Fundamental Data Engineering 02 course of AIDE. Special thanks to Mr. Nguyen Thanh Binh, Mr. Ong Xuan Hong, and Mr. Hung Le.
I love reading books, and I have a Kindle e-reader for my daily reading.
One thing I like about the Kindle is that it has a separate email address provided by Amazon. If I use my own email to send e-book files (in .epub/.mobi format), the Amazon system will automatically send the files to my Kindle as long as there is an internet connection.
So why not build an app that can extract data from Goodreads (a social network for book lovers), process it, and provide recommendations for my next reads? And that's where the project begins :D
Dataset is collected from Kaggle, OpenLibrary API, Google Drive API and Notion API
The objective of the project is to provide book recommendations to users based on the processed data collected. When a user inputs information about a book they have read, the app will suggest potential next reads. If the book has an .epub file, the app will also have a feature to send the book to the user's Kindle.
app
: The UI's application written with streamlitdagster_home
: Dagit and dagster daemon's configurationsdataset
: Dataset under .csv format, in order to load into MySQLdocker-compose
: To compose docker containersdockerimages
: Include self-built docker images, such as dagster (for dagit + daemon), spark master, streamlit app, ...EDA.ipynb
: Exploratory Data Analysis, view directly hereelt_pipeline
: The pipelinedbt_transform
: dbt's code location, used for the last transform stepDockerfile + requirements.txt
: Docker imageelt_pipeline
: EL (Extract -> Transform) pipeline
.env + .spark_master.env + .spark_worker.env
: Env variables (e.g POSTGRES_USER, MYSQL_USER, SPARK, ...)env.template
: Env variables template.git + .gitignore
: Code versioningMakefile
: Shortcut for terminal's commandsload_dataset
: .sql scripts to create schema and loaddataset
into MySQL, Postgresrequirements.txt + Pipfile + Pipfile.lock
: Python's dependencies
In addition, the containers also have their own separate directories, which include:
minio
storage
mysql_data
postgres_data
metabase_data
Visit file tree.txt for more details.
- We use docker to containerize the application and dagster to orchestrate assets (as defined in dagster's documentation).
- Goodreads data is downloaded from Kaggle in
.csv
format, then imported intoMySQL
to simulate development data - After obtaining the book's ISBN (international standard book number), collect additional data from relevant APIs:
- Genre, author, pages number, image, description from
OpenLibrary API
- Download link from
Notion API
- Epub file from
Google Drive API
- Image from
OpenLibrary API
orGoogle Drive API
- Genre, author, pages number, image, description from
- Extract the table-formatted data above using
polars
, and load it into the datalake -MinIO
. - From
MinIO
, load data intospark
to transform from raw into silver & gold - Convert
Spark DataFrame
to.parquet
, and load back toMinIO
- Load gold layer into data warehouse - postgreSQL, creating the warehouse layer.
- Transform as needed using
dbt
onpostgres
- Visualize the data using
metabase
- Create a book recommendation app using
streamlit
book
: OLTP table contains books' infomation (e.g ISBN, Authors, Rating, Description...)genre
: table contains genresbook_genre
: n-n relationship ofbook
andgenre
book_download_link
: table contains link google drivefiles
: object storage contains books' download files (.epub/.pdf/.mobi)images
: object storage contains books' images
- The datalake is divided into three layers: bronze, silver, and gold.
- All files are under the .parquet format for better reading performance than .csv.
- There are also
files
that stores .epub files in the format of abc.epub, where abc is the ISBN of the book. - Similarly, abc.jpeg stores the image of the book.
- General
With a dense data lineage, Dagster is a big help when it comes to visualizing it in a clear way:
- Data originates from MySQL and various APIs, and is loaded into the bronze layer.
- From the bronze layer, data is deduped, cleaned, and missing values are filled in the silver layer.
- Advanced computations and splitting are then performed in the gold layer.
- The data is loaded into the data warehouse - Postgres - in the warehouse layer.
- Finally, transformations are made according to needs in the recommendations layer using dbt.
- Bronze layer
Includes these assets:
- bronze_book: Table
book
from MySQL, because too large (over 1.2 million rows) it is partitioned by year from 1975 to 2022. - bronze_genre: Table
genre
from MySQL. - bronze_book_genre: Table
book_genre
from MySQL. - bronze_book_download_link: Table
book_download_link
from MySQL. - bronze_images_and_files_download: Đảm nhận việc kết nối tới google drive api, kéo file .epub và hình ảnh về, lưu trong datalake.
- bronze_images_and_files_download: Responsible for connecting to the Google Drive API, pulling the .epub file and images, and storing them in the datalake.
- Silver layer
Includes these assets:
- silver_cleaned_book: Data cleaning from upstream
bronze_book
, partitioned to ensurespark standalone mode
can run efficiently. - silver_collected_book: Collect missing data from upstream such as authors, pages number, and description from
OpenLibrary API
. - silver_isbn: Extract the isbn column from book to serve as a dependency for assets related to genre.
- silver_cleaned_genre: Similar to
silver_cleaned_book
, but doesn't need partitioning as its size is not very large. - silver_collected_genre: Based on
silver_isbn
, collect missing genres for each book. If there is no genre, it cannot be used for recommendations in subsequent tasks. - silver_collected_book_genre: Establish the n-n relationship between book and genre.
- Gold layer
Includes these assets:
- gold_genre: Compute and sort genres to match from upstream
silver_collected_genre
, while also saving them to minIO. - gold_book_genre: Similarly, from upstream
silver_collected_book_genre
. - gold_with_info: Splitting, containing only basic information about the book such as ISBN, Name, Authors, Language, PagesNumber.
- gold_with_publish: Splitting, containing information about the publisher, publication time.
- gold_with_rating: Splitting and computing different types of ratings.
- Warehouse layer
Load the assets from the gold layer into Postgres, including one asset from the bronze layer which is book_download_link.
In the future, the assets will be updated to add download links automatically from the Notion API, and a schedule will be set up.
- Transform layer
Includes these assets:
- search: Transform information to create an index table, which will be queried when users search for books.
- search_prior: Also an index table, but contains books that are given priority based on factors such as availability of download links, functionality of the OpenLibrary API, high ratings, etc.
- criteria: Criteria used to query related books when searching for a specific book.
To develop this pipeline, download and install these softwares:
- Git
- Docker with at least 4GB RAM, 6 core CPU, 2GB swap, 16GB disk
- CMake, nếu dùng hệ máy UNIX (Linux/MacOS), check
make --version
already installed - Python 3.x (3.9.16 recommended as the Spark image runs on this version, installing via asdf is recommended) and a virtual environment (pipenv recommended)
- A local machine that has freed the following ports: 3306, 5432, 9000, 9001, 3001, 8501, 4040, 7077, 8080, 3030
- Dbeaver or any other DB client (if not available, can use command-line)
If using Windows, set up WSL2 and a local Ubuntu virtual machine, then install the above software for Ubuntu.
Clone the repository
git clone https://github.com/lelouvincx/goodreads-elt-pipeline.git project
cd project
Download the csv dataset here, then place it in project/dataset
Firstly we need to create an OAuth 2.0 token to google, Google API Console.
Select create new project
:
Fill in project's name (goodreads-elt_pipeline), choose location (default No organization
).
After creating project, select tab Library
:
Search Google Drive API
, enable it.
Next, select tab OAuth consent screen
,
Fill in below information:
In scopes
, select add or remove scopes
, look for google drive api, readonly
then tick, save and continue
until end.
Select tab credentials
-> create credentials
then OAuth client ID
.
Select Desktop app
, name as you like (default: goodreads-elt-pipeline)
Download json and place in project/elt_pipeline/elt_pipeline
Clone repository:
# Create env file
touch .env
cp env.template .env
touch .spark_master.env
cp spark_master.env.template .spark_master.env
touch .spark_worker.env
cp spark_worker.env.template .spark_worker.env
Then fill in the infomation into the above env files, for examples:
# MySQL
MYSQL_HOST=de_mysql
MYSQL_PORT=3306
MYSQL_DATABASE=goodreads
MYSQL_USER=admin
MYSQL_PASSWORD=admin123
MYSQL_ROOT_PASSWORD=root123
# PostgreSQL
POSTGRES_HOST=de_psql
POSTGRES_PORT=5432
POSTGRES_USER=admin
POSTGRES_PASSWORD=admin123
POSTGRES_DB=goodreads
POSTGRES_HOST_AUTH_METHOD=trust
# Google Drive
GDRIVE_CLIENT_SECRET_FILE=client_secret.json
GDRIVE_PICKLE_FILE=token_drive_v3.pickle
GDRIVE_API_NAME=drive
GDRIVE_API_VERSION=v3
GDRIVE_SCOPES=https://www.googleapis.com/auth/drive.readonly
# Dagster
DAGSTER_PG_HOSTNAME=de_psql
DAGSTER_PG_USERNAME=admin
DAGSTER_PG_PASSWORD=admin123
DAGSTER_PG_DB=postgres
DAGSTER_OVERALL_CONCURRENCY_LIMIT=1
DAGSTER_HOME=/opt/dagster/dagster_home
# dbt
DBT_HOST=de_psql
DBT_USER=admin
DBT_PASSWORD=admin123
DBT_DATABASE=goodreads
DBT_SCHEMA=recommendations
# MinIO
MINIO_ENDPOINT=minio:9000
MINIO_ROOT_USER=minio
MINIO_ROOT_PASSWORD=minio123
MINIO_ACCESS_KEY=minio
MINIO_SECRET_KEY=minio123
DATALAKE_BUCKET=lakehouse
AWS_ACCESS_KEY_ID=minio
AWS_SECRET_ACCESS_KEY=minio123
AWS_REGION=us-east-1
# MinIO client (mc)
AWS_ACCESS_KEY_ID=minio
AWS_SECRET_ACCESS_KEY=minio123
AWS_REGION=us-east-1
# Spark
SPARK_MASTER_URL=spark://spark-master:7077
SPARK_VERSION=3.3.2
HADOOP_VERSION=3
# Metabase
MB_DB_TYPE=postgres
MB_DB_DBNAME=goodreads
MB_DB_PORT=5432
MB_DB_USER=admin
MB_DB_PASS=admin123
MB_DB_HOST=de_psql
MB_DB_FILE=/metabase_data/metabase.db
You can replace the infomation about user, password, ...
For development only, do not use for production.
# DO NOT RUN BOTH BELOW COMMANDS, ONLY CHOOSE ONE
# Setup python environment
pipenv install
# Or create virtualenv and install manually by requirements.txt
make install
# Build docker images
make build-dagster
make build-spark
make build-pipeline
make build-streamlit
# Run containers dettached
make up-bg
# Check running containers
docker compose ps -a
# Check code quality
make check
make lint
# Format pipelines
black ./elt_pipeline
# Test coverage
make test
Check there's 11 running services:
Ports:
- MySQL: 3306
- PostgreSQL: 5432
- Dagit: 3001
- MinIO
- UI: 9001
- API: 9000
- Spark master:
- UI: 8080
- API: 7077
- Pipeline:
- Spark jobs running: 4040
- Metabase: 3030
- Streamlit: 8501
Now we import the Goodreads dataset (unser csv format) into MySQL:
make to_mysql_root
SET GLOBAL local_infile=TRUE;
-- Check if local_infile was turned on
SHOW VARIABLES LIKE "local_infile";
exit
# Create tables with schema
make mysql_create
# Load csv into created tables
make mysql_load
make psql_create
- http://localhost:3001 - Dagit
- http://localhost:4040 - Spark jobs
- http://localhost:8080 - Spark master
- http://localhost:9001 - MinIO
- http://localhost:3030 - Metabase
- http://localhost:8501 - Streamlit
Evaluation of the project:
- Speed:
spark
is installed in standalone mode, so it does not achieve high performance and sometimes crashes in the middle of performing shuffle/read/write tasks. - Development environment: Currently, there is only a development environment, and in the future, testing, staging, and production environments will be considered.
dbt
is currently a small project, and in the future, if more transformations are needed, it should be split into separate services with different permissions.- Deployment: Using one of the cloud computing services such as AWS, Azure, GCP.
- Complete the recommender system
- Integrate Jupyter Notebook for DS tasks - dagstermill
- Testing environment
- Continuous Integration with Github Actions