Goal • Senario • Tech_Stack • Set-up • Process • License
This project is based on ETL(data-pipeline). collecting data from kaggle_api then load raw_data into s3, make a summary table for analytical activity using sparkSQL
, finally process data load into redshift. now let's analylize summary data using BI tools(preset.io
) connecting the redshift.
SparkSQL
- Do processing during the task of transform
Preset.io
- View graph with data connecting database, in this case Redshift!
Architecture
Data
- contents : the 120 years of Olympics History
- format : zip
- size : 42mb
Extact
- Collecting the olympics data using kaggle api in local
- Load the kaggle data into aws s3
Transform
- create schema and tables for summary-table
- call the process with spark
Load
- load process-data into the aws redshift
- check the data-quality
Analysis
-
view summary-table using BI tools
- Docker
- Spark
- AWS S3
- AWS RedShift
- Airflow
- Python
- Git
Kaggle
-
file : cred/kaggle/kaggle.json
-
Need getting the key for collecting data
{"username":"[username]","key":"[key]"}
AWS
-
file : creds/aws/credentials
-
Input access_key and secret_key for connecting aws in airflow
[default] aws_access_key_id = [access_key_id] aws_secret_access_key = [secret_key]
AWS
-
components :
aws_key
,aws_secret_key
-
in_code :
aws_config = Variable.get("aws_config", deserialize_json=True) aws_key : aws_config["aws_key"] aws_secret_key : aws_config["aws_secret_key"],
S3
-
components :
s3_bucket
,s3_key
-
in_code :
s3_config = Variable.get("s3_config", deserialize_json=True) s3_bucket : s3_config["s3_bucket"] s3_key : s3_config["s3_key"]
Redshift
-
components :
db_user
,db_pass
,conn_string
-
in_code :
redshift_config = Variable.get("redshift_config", deserialize_json=True) db_user : redshift_config.get("db_user") db_pass : redshift_config.get("db_pass") redshift_conn_string : redshift_config.get("conn_string")
Slack
-
components :
channel
,token_key
-
in_code :
config_slack = Variable.get("slack_config", deserialize_json=True) channel = config_slack['channel'] token = config_slack['token_key']
Redshift
-
purpose : Query to Redshift using PostgresHook
hook = PostgresHook(postgres_conn_id='redshift_dev_db')
Task start_operator
- Operator : DummyOperator
- Contents : Staring the dag
Task download_data
- Operator : BashOperator
- Contents : Collecting the data in kaggle for processing
bash_command='''mkdir {path}/olympics;
cd {path}/olympics;
kaggle datasets download -d heesoo37/120-years-of-olympic-history-athletes-and-results;
unzip 120-years-of-olympic-history-athletes-and-results.zip;
'''.format(path=input_path)
Task load_raw_data_into_s3
- Operator : PythonOperator
- Contents :
- Load collecting data into the aws s3 for processing
- remove the data
hook = S3Hook()
bucket = s3_config['s3_bucket']
key = s3_config['s3_key']
for file_path in Path(input_path).glob("*.csv"):
file_name = str(file_path).split('/')[-1]
hook.load_file(file_path, key+'/raw/'+file_name,
bucket_name=bucket, replace=True)
# delete data directory
shutil.rmtree(input_path+"/olympics")
Task create_schema
- Operator : PostgresOperator
- Contents : create the schema if not exists for summary-table
BEGIN;
CREATE SCHEMA IF NOT EXISTS olympics;
END;
Task create_table
- Operator : PostgresOperator
- Contents : create the table if not exists for summary-table
BEGIN;
DROP TABLE IF EXISTS olympics.korea_medal;
CREATE TABLE IF NOT EXISTS olympics.korea_medal (
sport VARCHAR(255),
gold bigint NOT NULL,
silver bigint NOT NULL,
bronze bigint NOT NULL,
total bigint NOT NULL,
primary key (sport)
) diststyle key distkey(sport);
END;
Task process_korea_medal
- Operator : BashOperator
- Contents : Processing the data for making summary-table using pySparkSQL
- process :
- Run the spark-submit using bash
- Create spark-session and config for connecting aws
- Import data in s3
- Processing data using pySparkSQL
- Load into redshift
params['python_script'] = 'process_korea_medal.py'
process_korea_medal = BashOperator(
task_id='process_korea_medal',
bash_command='./bash_scripts/load_staging_table.sh',
params=params
)
Task check_data_quality
- Operator : DataQualityOperator
- Contents :
- Check the data through counting row
- Check the data through checking null
# check the data quality
tables = ["olympics.korea_medal"]
check_data_quality = DataQualityOperator(task_id='check_data_quality',
redshift_conn_id="redshift_dev_db",
table_names=tables)
Task endRun
- Operator : DummyOperator
- Contents : Ending the dag
preset.io
-
purpose : Expressing the graph for analysis
-
data : The amount of medal with Korea in 1896 ~ 2016 olympics
You can check out the full license here
This project is licensed under the terms of the MIT license.