/kagglETL

extract data by kaggle / transform using SparkSQL / load into RedShift

Primary LanguagePythonMIT LicenseMIT

KagglETL

GoalSenarioTech_StackSet-upProcessLicense


🚩Goal

This project is based on ETL(data-pipeline). collecting data from kaggle_api then load raw_data into s3, make a summary table for analytical activity using sparkSQL, finally process data load into redshift. now let's analylize summary data using BI tools(preset.io) connecting the redshift.

SparkSQL

  • Do processing during the task of transform

Preset.io

  • View graph with data connecting database, in this case Redshift!

📒Senario

Architecture

Data

ETL

Extact

  • Collecting the olympics data using kaggle api in local
  • Load the kaggle data into aws s3

Transform

  • create schema and tables for summary-table
  • call the process with spark

Load

  • load process-data into the aws redshift
  • check the data-quality

Analysis

  • view summary-table using BI tools


📚Tech_Stack

  • Docker
  • Spark
  • AWS S3
  • AWS RedShift
  • Airflow
  • Python
  • Git

Set-up

prerequirement


Kaggle

  • file : cred/kaggle/kaggle.json

  • Need getting the key for collecting data

    {"username":"[username]","key":"[key]"}
    

AWS

  • file : creds/aws/credentials

  • Input access_key and secret_key for connecting aws in airflow

    [default]
    aws_access_key_id = [access_key_id]
    aws_secret_access_key = [secret_key]
    

config


Variables

AWS

  • components : aws_key, aws_secret_key

  • in_code :

    aws_config = Variable.get("aws_config", deserialize_json=True)
    
    aws_key : aws_config["aws_key"]
    aws_secret_key : aws_config["aws_secret_key"],

S3

  • components : s3_bucket, s3_key

  • in_code :

    s3_config = Variable.get("s3_config", deserialize_json=True)
    
    s3_bucket : s3_config["s3_bucket"]
    s3_key : s3_config["s3_key"]

Redshift

  • components : db_user, db_pass, conn_string

  • in_code :

    redshift_config = Variable.get("redshift_config", deserialize_json=True)
    
    db_user : redshift_config.get("db_user")
    db_pass : redshift_config.get("db_pass")
    redshift_conn_string : redshift_config.get("conn_string")

Slack

  • components : channel, token_key

  • in_code :

    config_slack = Variable.get("slack_config", deserialize_json=True)
    
    channel = config_slack['channel']
    token = config_slack['token_key']

Connection

Redshift

  • purpose : Query to Redshift using PostgresHook

    hook = PostgresHook(postgres_conn_id='redshift_dev_db')
    

Process

Task

Task start_operator

  • Operator : DummyOperator
  • Contents : Staring the dag

Task download_data

  • Operator : BashOperator
  • Contents : Collecting the data in kaggle for processing
bash_command='''mkdir {path}/olympics;
        cd {path}/olympics;
        kaggle datasets download -d heesoo37/120-years-of-olympic-history-athletes-and-results;
        unzip 120-years-of-olympic-history-athletes-and-results.zip;
        '''.format(path=input_path)

Task load_raw_data_into_s3

  • Operator : PythonOperator
  • Contents :
    1. Load collecting data into the aws s3 for processing
    2. remove the data
    hook = S3Hook()
    bucket = s3_config['s3_bucket']
    key = s3_config['s3_key']

    for file_path in Path(input_path).glob("*.csv"):
        file_name = str(file_path).split('/')[-1]
        hook.load_file(file_path, key+'/raw/'+file_name,
                       bucket_name=bucket, replace=True)

    # delete data directory
    shutil.rmtree(input_path+"/olympics")

Task create_schema

  • Operator : PostgresOperator
  • Contents : create the schema if not exists for summary-table
BEGIN;

CREATE SCHEMA IF NOT EXISTS olympics;

END;

Task create_table

  • Operator : PostgresOperator
  • Contents : create the table if not exists for summary-table
BEGIN;

DROP TABLE IF EXISTS olympics.korea_medal;

CREATE TABLE IF NOT EXISTS olympics.korea_medal (
    sport VARCHAR(255),
    gold bigint NOT NULL,
    silver bigint NOT NULL,
    bronze bigint NOT NULL,
    total bigint NOT NULL,
    primary key (sport)
) diststyle key distkey(sport);

END;

Task process_korea_medal

  • Operator : BashOperator
  • Contents : Processing the data for making summary-table using pySparkSQL
  • process :
    1. Run the spark-submit using bash
    2. Create spark-session and config for connecting aws
    3. Import data in s3
    4. Processing data using pySparkSQL
    5. Load into redshift
params['python_script'] = 'process_korea_medal.py'
    process_korea_medal = BashOperator(
        task_id='process_korea_medal',
        bash_command='./bash_scripts/load_staging_table.sh',
        params=params
    )

Task check_data_quality

  • Operator : DataQualityOperator
  • Contents :
    1. Check the data through counting row
    2. Check the data through checking null
    # check the data quality
    tables = ["olympics.korea_medal"]
    check_data_quality = DataQualityOperator(task_id='check_data_quality',
                                             redshift_conn_id="redshift_dev_db",
                                             table_names=tables)

Task endRun

  • Operator : DummyOperator
  • Contents : Ending the dag

Analysis

preset.io

  • purpose : Expressing the graph for analysis

  • data : The amount of medal with Korea in 1896 ~ 2016 olympics

License

You can check out the full license here

This project is licensed under the terms of the MIT license.