Project summary 1. Introduction International Bank for Reconstruction and Development, wants to move their processes and data pipeline onto the cloud. Their data resides in S3, in a directory of CSV on the transcation history of the loans. In this project, we will build an ETL pipeline that extracts data from S3, processes and clean them using Spark, and loads the data back into S3 as a set of dimensional tables in parquet format. Then, create cluster with redshift and load data into redshift. This will allow our analytics team to continue finding insights of our loan users. The project follows the follow steps: * Step 1: Scope the Project and Gather Data * Step 2: Explore and Assess the Data * Step 3: Clean and Save the Data * Step 3: Define the Data Model * Step 4: Run ETL to Model the Data * Step 5: Complete Project Write Up The data is coming from World Bank and you can download it from below page: https://finances.worldbank.org/Loans-and-Credits/IBRD-Statement-Of-Loans-Historical-Data/zucq-nrc3 It is around 300mb and below is the data layout of the file. 2. File Description In this repository, you will see Test.ipynb, csp.cfg, etl.py, README.md. Test.ipynb: this provides a script that you can use to run this project. dwh.cfg: a basic config file, includes all the basic configuration. etl.py: this is the script that will load data from s3 via spark and save them back to s3. README.md: a description of this project. 3. Project Description The data lake we created for this project has 6 tables: 1 fact table and 5 dimension tables Fact table: Transaction - records in transactions, columns have Loan_Number, Time_Id, Country_Id, Guarantor_Country_Id, Loan_Type, Loan_Status_Id, Amount_Id, End_of_Period, Interest_Rate, Project_ID, Exchange_Adjustment, Borrowers_Obligation, Cancelled_Amount,Undisbursed_Amount, Disbursed_Amount, Repaid_to_IBRD, Due_to_IBRD, Loans_Held Dimension tables: country - country that has loan or as a Guarantor in the transaction history, columns have Country_Id, Country_Code, Country, Region loan_status - loan status in transaction history, columns have Loan_Status_Id, Loan_Status loan_type - loan type in transaction history, columns have Loan_Type_Id, Loan_Type amount - different amounts in transaction history, columns have Amount_Id, Original_Principal_Amount, Sold_3rd_Party, Repaid_3rd_Party, Due_3rd_Party time - columns have timestamps of records in transaction history, columns have Time_Id, First_Repayment_Date, Last_Repayment_Date, Agreement_Signing_Date Data dictionary Every column is coming from the raw data except those unique IDs. For data layout, please see Data Layout.txt file in workspace. 4. ETL Pipeline For the ETL pipeline, there are 6 phrases: A.Loading data from S3 into our data lake. B.Creating and cleaning data with spark. C.Saving parquet file back to S3. D.Create Redshift on AWS with script. E.Load data into redshift from S3. F.Data qulity check. To start review this project, please open and run Test.ipynb step by step. A.Loading data from S3 into our data lake We will load data from S3 and explore the data with .show(), printschema(), describe() functions. By exploring data, we can find that column name has space inside and inside the string column, there are upper case and lower case, which may cause comparing issue when comparing string data. To prevent these potenial issue, we remove the space in column name and uppercase everything of data. B.Creating and cleaning data with spark. 1. First of all, we will load all libaries we need and create a spark session with create_spark_session() function. Then, we load log file into our session. 2. With spark, we create dataframes with select statements and apply some data cleaning and manipulation with data. a. Droping columns have too null value By running below command, we will see how many null values each column has: df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).toPandas() We will find, Last_Disbursement_Date and Currency_of_Commitment both have more than 40% null value, so we drop them. If we look into Borrower and Project_Name, we will find there are a lot of bad data, like "?@", and for same borrower, we will find they have different names. Not real name, but name with these "?" mark, in which I think it comes from spanish and while system loading these files, they can't be recoginized. b.Fixing Loan number For loan number, in the data dictnory, it should be a 9 digits number, in format XXXX12345. However, looking at the data, we will find we have bad data like XXXX12, XXXX123 and XXXX1234, which is missing "0" in the middle. So we will fix it with regrexp_replace functon. c. Dropping bad data For each loan number, sicne they only have one borrower, so the borrower should only belong to one country and region. However, we find there are 3 records that they have wrong country. We will drop tme. d.Replacing null value with correct data We will replace null value of each column. The idea here is: since loan number is unique identifier, so for each loan number, it should have its specific values. so for each column that has null value, we create a dictniory that using loan number as key and that column as values. then, we filter out the records with null value and replace it's null value. Finally, we drop the records that are stil have null value for that column, because they are true missing value, which means for that loan number, we only have one record in this data and for its missing value, we can't do anything except drop it. For details, you can go to Capstone Project Tempalte and review from there. e. Generate dataframe for dimensional and fact table For dimensional table, we will keep the loan number first for creating transaction table, then we will drop them. For fact table transaction, we will create it by joing log table with the other tables on loan number. C. Saving parquet file back to S3 After all above, we save the data to s3 and complete our job. If you want to display the file you saved on s3, you can run display_file_s3() function. If you want to look at the data you saved locally, you can go to Output folder and find them there. You can run Delete_All() function to delete all files you created locally. D.Create Redshift on AWS with script By running Reshift.sh, you will create a redshift cluster with basic cofigure comes from dwh.cfg, by default, the configure is: DWH_CLUSTER_TYPE=multi-node DWH_NUM_NODES=4 DWH_NODE_TYPE=dc2.large DWH_IAM_ROLE_NAME=dwhRole DWH_CLUSTER_IDENTIFIER=redshift DWH_DB=dwh DWH_DB_USER=dwhuser DWH_DB_PASSWORD=Passw0rd DWH_PORT=5439 If you want to change it, you can edit dwh.cfg and put your own configure. E.Load data into redshift from S3 Inside etl.py, we will connect to redshift first, then I creat drop_tables(), create_tables(),change_timestamp() and load_tables() functions. Each of these functions will loop through a list of sql statement to drop, create and load tables. These sql queries are stored inside sql_queries.py and theey are: copy_table_queries, drop_table_queries, create_table_queries, insert_table_queries and drop_staging_table_queries. copy_table_queries has the queries that contains copy commands. drop_table_queries has the queries that will drp table if it exists. create_table_queries has the queries that will create table with exptected data type. insert_table_queries has the queries that will change the data type of date related columns to timestamp. drop_staging_table_queries has the queries that will drop the staging table. F.Data Quality Check From here, we will first check Integrity constraints on the relational database (e.g., unique key, data type, etc.) Then, we will count on each table to make sure there's no empty table. This ensures that the scripts are doing the right things. Last, we will compare the counts between target table and spark dataframe to ensure data is loaded correctly. Finally, everything is inside redshift and we can start querying them to pull more insights from the data! Please remember to drop the cluster and role when you finished. 5. Project Write Up Rationale for the choice of tools and technologies for the project. In this project, I choose AWS S3, Redshift, Spark as my tools since these are really good tools to handle big data in cloud enviroment. AWS S3: a well-known file system, which supports a lot of file type and provides great enough API that we can easily meet our file transfer requirement. AWS Redshift: a convenient tool, which is built on massive parallel processing data warehouse. It has different types of database and can handle large scale data sets. The most important benefit is its sscalability. PySpark: easy to learn and implement and provides simple and comprehensive API. It also comes with a wide range of libraries like numpy, pandas, scikit-learn, seaborn, matplotlib etc. It is backed up by a huge and active community. Below are some scenarios that we might encounter in the future. 1.The data was increased by 100x. If the data was increased by 100x, we will have 2 ways to deal with it. a.Ungrade the AWS Redshift and EC2 so we will more compute power. This is easier way, however, it will cost more. b.Partition the data with Loan Number and process eahc chunk in parallel. 2.The data populates a dashboard that must be updated on a daily basis by 7am every day. We can use airflow to create a dag to take care of the scheduling. This is also a further step I want to work on for this project. 3.The database needed to be accessed by 100+ people. We can create different users and roles in AWS IAM and assign access to different user different roles.