Covid-19 Data Warehousing Project
This Project demonstrates the complete process of creating Redshift datawarehouse for Covid-19 data which can be further expanded by using BI tools for Datavisualization and Dashboarding.
The Process Architecture 👇
Create an S3 bucket for the project
Upload the Data to the bucket
Create IAM Role for Glue crawler
Below 👇image demonstrates the policies attached to the role
Create Glue Crawlers for each dataset
1. Set crawlers Properties
Below 👇image demonstrates the selection of datasource for the dataset enigma-jhud
Note: choose the folder of the dataset not the dataset file
3. Choose the option crawl all the sub-folders in the Subsequent crawler runs
4. Configure the security settings and select IAM role you have created for Glue crawler
Below image 👇 shows the IAM role attached
5. Choose option "Add database" to create database
Give the database name and choose option "Create database"
6. Choose option next and Create the crawler
7. After successful creation run the crawler
Similarly create crawlers for all the datasets and run them
• Go to Manage settings in query editor and add the location of S3 bucket you want to store the query results
Note: You have to create a bucket to store the query result data as shown in the image and add create a folder named "output" for the query responses and "packages" for storing redshift-connector .whl library file later in it
Using boto3 to perform task through IDE
Boto3 is a Python package that allows you to interact with Amazon Web Services (AWS). AWS is a cloud platform that provides various services such as storage, computing, monitoring, etc. Boto3 lets you use Python code to access and manage these services. You can install Boto3 from pip installer, and configure your credentials and region. Boto3 supports many AWS services and has a comprehensive documentation.
import boto3
import pandas as pd
from io import StringIO
AWS_ACCESS_KEY = "Put your Iam User Access key here"
AWS_SECRET_KEY = "Put your Iam User secret Access key here"
# If no IAM user is created create an IAM user with Administrative access
# and get its access key ID & seccret access key
AWS_REGION = "ap-south-1"
SCHEMA_NAME = "covid-19"
S3_STAGING_DIR = "s3://durgeshp-test-bucket/output"
S3_BUCKET_NAME = "durgeshp-test-bucket"
S3_OUTPUT_DIRECTORY = "output"
athena_client = boto3 .client ("athena" ,
aws_access_key_id = AWS_ACCESS_KEY ,
aws_secret_access_key = AWS_SECRET_KEY ,
region_name = AWS_REGION ,
)
def athena_query (SqlQuery :str ):
query_response = athena_client .start_query_execution (
QueryString = SqlQuery ,
ResultConfiguration = {
"OutputLocation" : S3_STAGING_DIR ,
"EncryptionConfiguration" :{"EncryptionOption" : "SSE_S3" },
},
)
return query_response
query_response = athena_query ('SELECT * FROM "covid-19"."enigma_jhud";' )
• After running the query we get response in the dictionery form
{'QueryExecutionId': '55a5db55-1265-42b1-b0d0-f1982b456571',
'ResponseMetadata': {'RequestId': '2318ec13-0206-40a2-9e3b-d2da901a6c9e',
'HTTPStatusCode': 200,
'HTTPHeaders': {'date': 'Mon, 08 May 2023 11:52:43 GMT',
'content-type': 'application/x-amz-json-1.1',
'content-length': '59',
'connection': 'keep-alive',
'x-amzn-requestid': '2318ec13-0206-40a2-9e3b-d2da901a6c9e'},
'RetryAttempts': 0}}
• The QueryExecutionId is used to locate file in S3
query_response ['QueryExecutionId' ]
'55a5db55-1265-42b1-b0d0-f1982b456571'
• Sample of how file query response file location and file name looks like in S3 bucket
f"{ S3_OUTPUT_DIRECTORY } /{ query_response ['QueryExecutionId' ]} .csv"
'output/55a5db55-1265-42b1-b0d0-f1982b456571.csv'
Below 👇 code gets the athena query response (dataset) from the S3 bucket and stores it into the local file
def download_and_load_query_results (query_response :dict ,file_name :str ):
temp_file_location : str = f"D:\AWS\de_project\{ file_name } .csv"
s3_client = boto3 .client (
"s3" ,
aws_access_key_id = AWS_ACCESS_KEY ,
aws_secret_access_key = AWS_SECRET_KEY ,
region_name = AWS_REGION ,
)
s3_client .download_file (
S3_BUCKET_NAME ,
f"{ S3_OUTPUT_DIRECTORY } /{ query_response ['QueryExecutionId' ]} .csv" ,
temp_file_location ,
)
return pd .read_csv (temp_file_location )
Getting datasets from S3 bucket to the local file
enigma_jhud = download_and_load_query_results (query_response ,"enigma_jhud" )
fips
admin2
province_state
country_region
last_update
latitude
longitude
confirmed
deaths
recovered
active
combined_key
partition_0
0
NaN
NaN
Anhui
China
2020-01-22T17:00:00
31.826
117.226
1.0
NaN
NaN
NaN
"Anhui
csv
1
NaN
NaN
Beijing
China
2020-01-22T17:00:00
40.182
116.414
14.0
NaN
NaN
NaN
"Beijing
csv
2
NaN
NaN
Chongqing
China
2020-01-22T17:00:00
30.057
107.874
6.0
NaN
NaN
NaN
"Chongqing
csv
3
NaN
NaN
Fujian
China
2020-01-22T17:00:00
26.079
117.987
1.0
NaN
NaN
NaN
"Fujian
csv
4
NaN
NaN
Gansu
China
2020-01-22T17:00:00
36.061
103.834
NaN
NaN
NaN
NaN
"Gansu
csv
Dataset: nytimes-data-in-usa-country
query_response = athena_query ('SELECT * FROM "covid-19"."nytimes-data-in-usa-countryus_county";' )
nytimes_data_in_usa_country = download_and_load_query_results (query_response ,"nytimes_data_in_usa_country" )
nytimes_data_in_usa_country .head ()
date
county
state
fips
cases
deaths
0
2020-01-21
Snohomish
Washington
53061.0
1.0
0.0
1
2020-01-22
Snohomish
Washington
53061.0
1.0
0.0
2
2020-01-23
Snohomish
Washington
53061.0
1.0
0.0
3
2020-01-24
Cook
Illinois
17031.0
1.0
0.0
4
2020-01-24
Snohomish
Washington
53061.0
1.0
0.0
Dataset: nytimes_data_in_usa_state
query_response = athena_query ('SELECT * FROM "covid-19"."nytimes-data-in-usa-statesus_states";' )
nytimes_data_in_usa_statesus_states = download_and_load_query_results (query_response ,"nytimes_data_in_usa_states" )
nytimes_data_in_usa_statesus_states .head ()
date
state
fips
cases
deaths
0
2020-01-21
Washington
53
1
0
1
2020-01-22
Washington
53
1
0
2
2020-01-23
Washington
53
1
0
3
2020-01-24
Illinois
17
1
0
4
2020-01-24
Washington
53
1
0
Dataset: rearc_covid_19_testing_data_states_daily
query_response = athena_query ('SELECT * FROM "covid-19"."rearc-covid-19-testing-data-states-dailystates_daily";' )
rearc_covid_19_testing_data_states_daily = download_and_load_query_results (query_response ,"rearc_covid_19_testing_data_states_daily" )
rearc_covid_19_testing_data_states_daily .head ()
date
state
positive
probablecases
negative
pending
totaltestresultssource
totaltestresults
hospitalizedcurrently
hospitalizedcumulative
...
dataqualitygrade
deathincrease
hospitalizedincrease
hash
commercialscore
negativeregularscore
negativescore
positivescore
score
grade
0
20210307
AK
56886
NaN
NaN
NaN
totalTestsViral
1731628
33.0
1293.0
...
NaN
0.0
0.0
dc4bccd4bb885349d7e94d6fed058e285d4be164
0.0
0.0
0.0
0.0
0.0
NaN
1
20210307
AL
499819
107742.0
1931711.0
NaN
totalTestsPeopleViral
2323788
494.0
45976.0
...
NaN
-1.0
0.0
997207b430824ea40b8eb8506c19a93e07bc972e
0.0
0.0
0.0
0.0
0.0
NaN
2
20210307
AR
324818
69092.0
2480716.0
NaN
totalTestsViral
2736442
335.0
14926.0
...
NaN
22.0
11.0
50921aeefba3e30d31623aa495b47fb2ecc72fae
0.0
0.0
0.0
0.0
0.0
NaN
3
20210307
AS
0
NaN
2140.0
NaN
totalTestsViral
2140
NaN
NaN
...
NaN
0.0
0.0
96d23f888c995b9a7f3b4b864de6414f45c728ff
0.0
0.0
0.0
0.0
0.0
NaN
4
20210307
AZ
826454
56519.0
3073010.0
NaN
totalTestsViral
7908105
963.0
57907.0
...
NaN
5.0
44.0
0437a7a96f4471666f775e63e86923eb5cbd8cdf
0.0
0.0
0.0
0.0
0.0
NaN
5 rows × 56 columns
Dataset: rearc-covid-19-testing-data-us-daily
query_response = athena_query ('SELECT * FROM "covid-19"."rearc-covid-19-testing-data-us-dailyus_daily";' )
rearc_covid_19_testing_data_us_daily = download_and_load_query_results (query_response ,"rearc_covid_19_testing_data_us_daily" )
rearc_covid_19_testing_data_us_daily .head ()
date
states
positive
negative
pending
hospitalizedcurrently
hospitalizedcumulative
inicucurrently
inicucumulative
onventilatorcurrently
...
lastmodified
recovered
total
posneg
deathincrease
hospitalizedincrease
negativeincrease
positiveincrease
totaltestresultsincrease
hash
0
20210307
56
28755524.0
74579770.0
11808.0
40212.0
878613.0
8137.0
45475.0
2801.0
...
2021-03-07T24:00:00Z
NaN
0
0
839
726
130414
41265
1156241
8b26839690cd05c0cef69cb9ed85641a76b5e78e
1
20210306
56
28714259.0
74449356.0
11783.0
41401.0
877887.0
8409.0
45453.0
2811.0
...
2021-03-06T24:00:00Z
NaN
0
0
1674
503
142201
59620
1409138
d0c0482ea549c9d5c04a7c86acb6fc6a8095a592
2
20210305
56
28654639.0
74307155.0
12213.0
42541.0
877384.0
8634.0
45373.0
2889.0
...
2021-03-05T24:00:00Z
NaN
0
0
2221
2781
271917
68787
1744417
a35ea4289cec4bb55c9f29ae04ec0fd5ac4e0222
3
20210304
56
28585852.0
74035238.0
12405.0
44172.0
874603.0
8970.0
45293.0
2973.0
...
2021-03-04T24:00:00Z
NaN
0
0
1743
1530
177957
65487
1590984
a19ad6379a653834cbda3093791ad2c3b9fab5ff
4
20210303
56
28520365.0
73857281.0
11778.0
45462.0
873073.0
9359.0
45214.0
3094.0
...
2021-03-03T24:00:00Z
NaN
0
0
2449
2172
267001
66836
1406795
9e1d2afda1b0ec243060d6f68a7134d011c0cb2a
5 rows × 25 columns
Dataset: rearc-covid-19-testing-data-us-total-latest
query_response = athena_query ('SELECT * FROM "covid-19"."rearc-covid-19-testing-data-us-total-latestus_total_latest";' )
rearc_covid_19_testing_data_us_totallatest = download_and_load_query_results (query_response ,"rearc_covid_19_testing_data_us_totallatest" )
rearc_covid_19_testing_data_us_totallatest .head ()
positive
negative
pending
hospitalizedcurrently
hospitalizedcumulative
inicucurrently
inicucumulative
onventilatorcurrently
onventilatorcumulative
recovered
hash
lastmodified
death
hospitalized
total
totaltestresults
posneg
notes
0
1061101
5170081
2775
53793
111955
9486
4192
4712
373
153947
95064ba29ccbc20dbec397033dfe4b1f45137c99
2020-05-01T09:12:31.891Z
57266
111955
6233957
6231182
6231182
"NOTE: ""total""
Dataset: rearc_usa_hospital_beds
query_response = athena_query ('SELECT * FROM "covid-19"."rearc_usa_hospital_bedsrearc_usa_hospital_beds";' )
rearc_usa_hospital_beds = download_and_load_query_results (query_response ,"rearc_usa_hospital_beds" )
rearc_usa_hospital_beds .head ()
objectid
hospital_name
hospital_type
hq_address
hq_address1
hq_city
hq_state
hq_zip_code
county_name
state_name
...
num_licensed_beds
num_staffed_beds
num_icu_beds
adult_icu_beds
pedi_icu_beds
bed_utilization
avg_ventilator_usage
potential_increase_in_bed_capac
latitude
longtitude
0
1
Phoenix VA Health Care System (AKA Carl T Hayd...
VA Hospital
650 E Indian School Rd
NaN
Phoenix
AZ
85012
Maricopa
Arizona
...
129.0
129.0
0
0
NaN
NaN
0.0
0
33.495498
-112.066157
1
2
Southern Arizona VA Health Care System
VA Hospital
3601 S 6th Ave
NaN
Tucson
AZ
85723
Pima
Arizona
...
295.0
295.0
2
2
NaN
NaN
2.0
0
32.181263
-110.965885
2
3
VA Central California Health Care System
VA Hospital
2615 E Clinton Ave
NaN
Fresno
CA
93703
Fresno
California
...
57.0
57.0
2
2
NaN
NaN
2.0
0
36.773324
-119.779742
3
4
VA Connecticut Healthcare System - West Haven ...
VA Hospital
950 Campbell Ave
NaN
West Haven
CT
6516
New Haven
Connecticut
...
216.0
216.0
1
1
NaN
NaN
2.0
0
41.284400
-72.957610
4
5
Wilmington VA Medical Center
VA Hospital
1601 Kirkwood Hwy
NaN
Wilmington
DE
19805
New Castle
Delaware
...
60.0
60.0
0
0
NaN
NaN
1.0
0
39.740206
-75.606532
5 rows × 23 columns
Dataset: static-datasets-countrycode
query_response = athena_query ('SELECT * FROM "covid-19"."static-datasets-countrycodecountrycode";' )
static_datasets_countrycode = download_and_load_query_results (query_response ,"static_datasets_countrycode" )
static_datasets_countrycode .head ()
country
alpha-2 code
alpha-3 code
numeric code
latitude
longitude
0
Afghanistan
AF
AFG
4.0
33.0000
65.0
1
Albania
AL
ALB
8.0
41.0000
20.0
2
Algeria
DZ
DZA
12.0
28.0000
3.0
3
American Samoa
AS
ASM
16.0
-14.3333
-170.0
4
Andorra
AD
AND
20.0
42.5000
1.6
Dataset: static_datasets_countypopulation
query_response = athena_query ('SELECT * FROM "covid-19"."static_datasets_countypopulationcountypopulation";' )
static_datasets_countypopulation = download_and_load_query_results (query_response ,"static_datasets_countypopulation" )
static_datasets_countypopulation .head ()
id
id2
county
state
population estimate 2018
0
0500000US01001
1001
Autauga
Alabama
55601
1
0500000US01003
1003
Baldwin
Alabama
218022
2
0500000US01005
1005
Barbour
Alabama
24881
3
0500000US01007
1007
Bibb
Alabama
22400
4
0500000US01009
1009
Blount
Alabama
57840
Dataset: static-datasets-state-abvstate_abv
query_response = athena_query ('SELECT * FROM "covid-19"."static-datasets-state-abvstate_abv";' )
state_abvstate_abv = download_and_load_query_results (query_response ,"state_abvstate_abv" )
state_abvstate_abv .head ()
State
Abbreviation
1
Alabama
AL
2
Alaska
AK
3
Arizona
AZ
4
Arkansas
AR
5
California
CA
new_header = state_abvstate_abv .iloc [0 ]
state_abvstate_abv = state_abvstate_abv [1 :]
state_abvstate_abv .columns = new_header
state_abvstate_abv .head ()
state_abvstate_abv .to_csv ("state_abvstate_abv" ,index = False )
State
Abbreviation
1
Alabama
AL
2
Alaska
AK
3
Arizona
AZ
4
Arkansas
AR
5
California
CA
Create Data model from the datasets
A data model is an abstract model that organizes elements of data and standardizes how they relate to one another and to the properties of real-world entities. It describes the structure, manipulation, and integrity aspects of the data stored in data management systems such as relational databases.
Create Dimensional model from the datasets
Dimensional model is a data structure technique optimized for data storage in a Data warehouse. The purpose of dimensional modeling is to optimize the database for faster retrieval of data. Each dimensional model consists of many fact tables, with each fact table joined with corresponding dimension tables.
Create datasets according to the dimensional model
factcovid_1 = enigma_jhud [['fips' ,'province_state' ,'country_region' ,
'confirmed' , 'deaths' , 'recovered' , 'active' ,
]]
factcovid_2 = rearc_covid_19_testing_data_states_daily [['fips' ,'date' ,'positive' ,'negative' ,'hospitalizedcurrently' ,'hospitalized' ,'hospitalizeddischarged' ,]]
factCovid = pd .merge (factcovid_1 ,factcovid_2 ,on = 'fips' ,how = 'inner' )
fips
province_state
country_region
confirmed
deaths
recovered
active
date
positive
negative
hospitalizedcurrently
hospitalized
hospitalizeddischarged
0
NaN
Anhui
China
1.0
NaN
NaN
NaN
20210119
289939
NaN
1066.0
NaN
NaN
1
NaN
Beijing
China
14.0
NaN
NaN
NaN
20210119
289939
NaN
1066.0
NaN
NaN
2
NaN
Chongqing
China
6.0
NaN
NaN
NaN
20210119
289939
NaN
1066.0
NaN
NaN
3
NaN
Fujian
China
1.0
NaN
NaN
NaN
20210119
289939
NaN
1066.0
NaN
NaN
4
NaN
Gansu
China
NaN
NaN
NaN
NaN
20210119
289939
NaN
1066.0
NaN
NaN
• Schema for table factCovid
factCovidsql = pd .io .sql .get_schema (factCovid .reset_index (),'factCovid' )
print ('' .join (factCovidsql ))
CREATE TABLE "factCovid" (
"index" INTEGER,
"fips" REAL,
"province_state" TEXT,
"country_region" TEXT,
"confirmed" REAL,
"deaths" REAL,
"recovered" REAL,
"active" REAL,
"date" INTEGER,
"positive" INTEGER,
"negative" REAL,
"hospitalizedcurrently" REAL,
"hospitalized" REAL,
"hospitalizeddischarged" REAL
)
dimRegion_1 = enigma_jhud [['fips' ,'province_state' ,'country_region' ,'latitude' , 'longitude' ,]]
dimRegion_2 = nytimes_data_in_usa_country [['fips' ,'county' ,'state' ]]
dimRegion = pd .merge (dimRegion_1 ,dimRegion_2 ,on = 'fips' ,how = 'inner' )
fips
province_state
country_region
latitude
longitude
county
state
0
NaN
Anhui
China
31.826
117.226
New York City
New York
1
NaN
Anhui
China
31.826
117.226
Unknown
Rhode Island
2
NaN
Anhui
China
31.826
117.226
New York City
New York
3
NaN
Anhui
China
31.826
117.226
Unknown
Rhode Island
4
NaN
Anhui
China
31.826
117.226
New York City
New York
• Schema for table dimRegion
dimRegionsql = pd .io .sql .get_schema (dimRegion .reset_index (),'dimRegion' )
print ('' .join (dimRegionsql ))
CREATE TABLE "dimRegion" (
"index" INTEGER,
"fips" REAL,
"province_state" TEXT,
"country_region" TEXT,
"latitude" REAL,
"longitude" REAL,
"county" TEXT,
"state" TEXT
)
dimHospital = rearc_usa_hospital_beds [['fips' ,'state_name' ,'latitude' ,'longtitude' ,'hq_address' ,'hospital_name' ,'hospital_type' ,'hq_city' ,'hq_state' ]]
fips
state_name
latitude
longtitude
hq_address
hospital_name
hospital_type
hq_city
hq_state
0
4013.0
Arizona
33.495498
-112.066157
650 E Indian School Rd
Phoenix VA Health Care System (AKA Carl T Hayd...
VA Hospital
Phoenix
AZ
1
4019.0
Arizona
32.181263
-110.965885
3601 S 6th Ave
Southern Arizona VA Health Care System
VA Hospital
Tucson
AZ
2
6019.0
California
36.773324
-119.779742
2615 E Clinton Ave
VA Central California Health Care System
VA Hospital
Fresno
CA
3
9009.0
Connecticut
41.284400
-72.957610
950 Campbell Ave
VA Connecticut Healthcare System - West Haven ...
VA Hospital
West Haven
CT
4
10003.0
Delaware
39.740206
-75.606532
1601 Kirkwood Hwy
Wilmington VA Medical Center
VA Hospital
Wilmington
DE
• Schema for table dimHospital
dimHospitalsql = pd .io .sql .get_schema (dimHospital .reset_index (),'dimHospital' )
print ('' .join (dimHospitalsql ))
CREATE TABLE "dimHospital" (
"index" INTEGER,
"fips" REAL,
"state_name" TEXT,
"latitude" REAL,
"longtitude" REAL,
"hq_address" TEXT,
"hospital_name" TEXT,
"hospital_type" TEXT,
"hq_city" TEXT,
"hq_state" TEXT
)
dimDate = rearc_covid_19_testing_data_states_daily [['fips' ,'date' ]]
fips
date
0
2.0
20210307
1
1.0
20210307
2
5.0
20210307
3
60.0
20210307
4
4.0
20210307
dimDate ['date' ]= pd .to_datetime (dimDate ['date' ],format = '%Y%m%d' )
C:\Users\Durgesh patil\AppData\Local\Temp\ipykernel_19020\4062609722.py:1: SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead
See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
dimDate['date']=pd.to_datetime(dimDate['date'],format='%Y%m%d')
fips
date
0
2.0
2021-03-07
1
1.0
2021-03-07
2
5.0
2021-03-07
3
60.0
2021-03-07
4
4.0
2021-03-07
dimDate ['year' ] = dimDate ['date' ].dt .year
dimDate ['month' ] = dimDate ['date' ].dt .month
dimDate ['day_of_week' ] = dimDate ['date' ].dt .dayofweek
C:\Users\Durgesh patil\AppData\Local\Temp\ipykernel_19020\935310350.py:1: SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead
See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
dimDate['year'] = dimDate['date'].dt.year
C:\Users\Durgesh patil\AppData\Local\Temp\ipykernel_19020\935310350.py:2: SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead
See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
dimDate['month'] = dimDate['date'].dt.month
C:\Users\Durgesh patil\AppData\Local\Temp\ipykernel_19020\935310350.py:3: SettingWithCopyWarning:
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead
See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
dimDate['day_of_week'] = dimDate['date'].dt.dayofweek
fips
date
year
month
day_of_week
0
2.0
2021-03-07
2021
3
6
1
1.0
2021-03-07
2021
3
6
2
5.0
2021-03-07
2021
3
6
3
60.0
2021-03-07
2021
3
6
4
4.0
2021-03-07
2021
3
6
• Schema for the dimDate table
dimDatesql = pd .io .sql .get_schema (dimDate .reset_index (),'dimDate' )
print ('' .join (dimDatesql ))
CREATE TABLE "dimDate" (
"index" INTEGER,
"fips" REAL,
"date" TIMESTAMP,
"year" INTEGER,
"month" INTEGER,
"day_of_week" INTEGER
)
bucket = 'durgesh-covid-project' #Already created in S3
s3_client = boto3 .client (
"s3" ,
aws_access_key_id = AWS_ACCESS_KEY ,
aws_secret_access_key = AWS_SECRET_KEY ,
region_name = AWS_REGION ,
)
csv_buffer = StringIO ()
factCovid .to_csv (csv_buffer )
s3_client .put_object (Body = csv_buffer .getvalue (), Bucket = bucket , Key = 'output/factCovid.csv' )
{'ResponseMetadata': {'RequestId': '5PBDVZ2GV9DMAF4Q',
'HostId': 'b8WWNJ67ig9IyfMSXfz/QGovMeDPxBNIIo87PyWdB1MyXkNRb6XLJQY0URjVC4D5eOPckbxHE4DV1bCwdMJ2rQ==',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amz-id-2': 'b8WWNJ67ig9IyfMSXfz/QGovMeDPxBNIIo87PyWdB1MyXkNRb6XLJQY0URjVC4D5eOPckbxHE4DV1bCwdMJ2rQ==',
'x-amz-request-id': '5PBDVZ2GV9DMAF4Q',
'date': 'Mon, 08 May 2023 18:46:17 GMT',
'x-amz-version-id': '17oSOSl722ZV_RAFHm9LG5JpqoPhj1Cx',
'x-amz-server-side-encryption': 'AES256',
'etag': '"fc9be4f5fc50864df1f4ed534cf29988"',
'server': 'AmazonS3',
'content-length': '0'},
'RetryAttempts': 0},
'ETag': '"fc9be4f5fc50864df1f4ed534cf29988"',
'ServerSideEncryption': 'AES256',
'VersionId': '17oSOSl722ZV_RAFHm9LG5JpqoPhj1Cx'}
csv_buffer = StringIO ()
dimRegion .to_csv (csv_buffer )
s3_client .put_object (Body = csv_buffer .getvalue (), Bucket = bucket , Key = 'output/dimRegion.csv' )
{'ResponseMetadata': {'RequestId': 'FYR6NHKNZT82067Q',
'HostId': 'cYQOcBt7GOXDNcywH5MsCUxkGj7/cAGqYEEguY/hUDPTFCoBYvnfJgeKpYua7mhBei24tUHs9Wg=',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amz-id-2': 'cYQOcBt7GOXDNcywH5MsCUxkGj7/cAGqYEEguY/hUDPTFCoBYvnfJgeKpYua7mhBei24tUHs9Wg=',
'x-amz-request-id': 'FYR6NHKNZT82067Q',
'date': 'Mon, 08 May 2023 18:57:32 GMT',
'x-amz-version-id': 'jEEwpk7n174NSeUN_.gK8P9p0hRSvqhJ',
'x-amz-server-side-encryption': 'AES256',
'etag': '"2fdf65265c31f0f6e8c02b4bd066d670"',
'server': 'AmazonS3',
'content-length': '0'},
'RetryAttempts': 0},
'ETag': '"2fdf65265c31f0f6e8c02b4bd066d670"',
'ServerSideEncryption': 'AES256',
'VersionId': 'jEEwpk7n174NSeUN_.gK8P9p0hRSvqhJ'}
csv_buffer = StringIO ()
dimHospital .to_csv (csv_buffer )
s3_client .put_object (Body = csv_buffer .getvalue (), Bucket = bucket , Key = 'output/dimHospital.csv' )
{'ResponseMetadata': {'RequestId': 'G19Q0FMH1T83GVM9',
'HostId': '8gjGOKxCxNcbE54ausule1oBvlSJANlu4HzpetQcbRWcj3RCn32b45dvwjWmS7JCW5lL2CXBDFY=',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amz-id-2': '8gjGOKxCxNcbE54ausule1oBvlSJANlu4HzpetQcbRWcj3RCn32b45dvwjWmS7JCW5lL2CXBDFY=',
'x-amz-request-id': 'G19Q0FMH1T83GVM9',
'date': 'Mon, 08 May 2023 18:56:00 GMT',
'x-amz-version-id': '4MgQplRT6wYyQ4.OhOR4tyCh_pidkds.',
'x-amz-server-side-encryption': 'AES256',
'etag': '"a26c4e35d128fe6f64955ba9aac1d221"',
'server': 'AmazonS3',
'content-length': '0'},
'RetryAttempts': 0},
'ETag': '"a26c4e35d128fe6f64955ba9aac1d221"',
'ServerSideEncryption': 'AES256',
'VersionId': '4MgQplRT6wYyQ4.OhOR4tyCh_pidkds.'}
csv_buffer = StringIO ()
dimDate .to_csv (csv_buffer )
s3_client .put_object (Body = csv_buffer .getvalue (), Bucket = bucket , Key = 'output/dimDate.csv' )
{'ResponseMetadata': {'RequestId': 'TFBZJ33FBSZ8C4MZ',
'HostId': 'Zzr1eWbEk7MxfS8z16zpvckVav83Q+m4oKaeXHP1skLupv2I6i/LhJKh61MVcUKeXsMgMbLgrkM=',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amz-id-2': 'Zzr1eWbEk7MxfS8z16zpvckVav83Q+m4oKaeXHP1skLupv2I6i/LhJKh61MVcUKeXsMgMbLgrkM=',
'x-amz-request-id': 'TFBZJ33FBSZ8C4MZ',
'date': 'Mon, 08 May 2023 18:56:43 GMT',
'x-amz-version-id': 'rimJk6e4kj3WXDeuQ1OJRYPf8cAVsLzz',
'x-amz-server-side-encryption': 'AES256',
'etag': '"19eb0b77e7f7441c686829bc3fd1a906"',
'server': 'AmazonS3',
'content-length': '0'},
'RetryAttempts': 0},
'ETag': '"19eb0b77e7f7441c686829bc3fd1a906"',
'ServerSideEncryption': 'AES256',
'VersionId': 'rimJk6e4kj3WXDeuQ1OJRYPf8cAVsLzz'}
Configuration of Cluster 👇
DWH_CLUSTER_TYPE = "single-node"
DWH_NUM_NODES = "1"
DWH_NODE_TYPE = "dc2.large"
DWH_CLUSTER_IDENTIFIER = "covidproject"
DWH_DB = "flight"
DWH_DB_USER = "durgesh"
DWH_DB_PASSWORD = "Password123"
DWH_PORT = "5439"
DWH_IAM_ROLE_NAME = "redshift_s3_access"
DWH_DB_NAME = "covid-db"
• Creating variables storing boto3.client for redshift & Iam
redshift_client = boto3 .client ("redshift" ,
aws_access_key_id = AWS_ACCESS_KEY ,
aws_secret_access_key = AWS_SECRET_KEY ,
region_name = AWS_REGION ,
)
iam_client = boto3 .client ("iam" ,
aws_access_key_id = AWS_ACCESS_KEY ,
aws_secret_access_key = AWS_SECRET_KEY ,
region_name = AWS_REGION ,
)
• Creating variable storing boto3.resource for ec2
ec2_resource = boto3 .resource ("ec2" ,
aws_access_key_id = AWS_ACCESS_KEY ,
aws_secret_access_key = AWS_SECRET_KEY ,
region_name = AWS_REGION ,
)
roleArn = iam_client .get_role (RoleName = DWH_IAM_ROLE_NAME )['Role' ]['Arn' ]
'arn:aws:iam::436117849909:role/redshift_s3_access'
redshift_response = redshift_client .create_cluster (
ClusterIdentifier = DWH_CLUSTER_IDENTIFIER ,
NodeType = DWH_NODE_TYPE ,
MasterUsername = DWH_DB_USER ,
MasterUserPassword = DWH_DB_PASSWORD ,
ClusterType = DWH_CLUSTER_TYPE ,
DBName = DWH_DB_NAME ,
#Role for S3 access
IamRoles = [roleArn ]
)
• Below response shows the configuration of the cluster created
{'Cluster': {'ClusterIdentifier': 'covidproject',
'NodeType': 'dc2.large',
'ClusterStatus': 'creating',
'ClusterAvailabilityStatus': 'Modifying',
'MasterUsername': 'durgesh',
'DBName': 'covid-db',
'AutomatedSnapshotRetentionPeriod': 1,
'ManualSnapshotRetentionPeriod': -1,
'ClusterSecurityGroups': [],
'VpcSecurityGroups': [{'VpcSecurityGroupId': 'sg-0a1c0b3f4a736fcba',
'Status': 'active'}],
'ClusterParameterGroups': [{'ParameterGroupName': 'default.redshift-1.0',
'ParameterApplyStatus': 'in-sync'}],
'ClusterSubnetGroupName': 'default',
'VpcId': 'vpc-09eadb7f91dfac446',
'PreferredMaintenanceWindow': 'sat:07:30-sat:08:00',
'PendingModifiedValues': {'MasterUserPassword': '****'},
'ClusterVersion': '1.0',
'AllowVersionUpgrade': True,
'NumberOfNodes': 1,
'PubliclyAccessible': True,
'Encrypted': False,
'Tags': [],
'EnhancedVpcRouting': False,
'IamRoles': [{'IamRoleArn': 'arn:aws:iam::436117849909:role/redshift_s3_access',
'ApplyStatus': 'adding'}],
'MaintenanceTrackName': 'current',
'DeferredMaintenanceWindows': [],
'NextMaintenanceWindowStartTime': datetime.datetime(2023, 5, 13, 7, 30, tzinfo=tzutc()),
'AquaConfiguration': {'AquaStatus': 'disabled',
'AquaConfigurationStatus': 'auto'}},
'ResponseMetadata': {'RequestId': '91d285ee-bf62-4629-a120-f991b68b8b7b',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amzn-requestid': '91d285ee-bf62-4629-a120-f991b68b8b7b',
'content-type': 'text/xml',
'content-length': '2463',
'date': 'Tue, 09 May 2023 07:11:30 GMT'},
'RetryAttempts': 0}}
• Go to redshift to verify the creation of the cluster
• Changing Security Group Configuration
VpcId = redshift_response ['Cluster' ]['VpcId' ]
vpc = ec2_resource .Vpc (id = VpcId )
defaultSg = list (vpc .security_groups .all ())[0 ]
defaultSg .authorize_ingress (
GroupName = defaultSg .group_name ,
CidrIp = '0.0.0.0/0' ,
IpProtocol = 'TCP' ,
FromPort = int (DWH_PORT ),
ToPort = int (DWH_PORT )
)
{'Return': True,
'SecurityGroupRules': [{'SecurityGroupRuleId': 'sgr-01a04994cce442209',
'GroupId': 'sg-0a1c0b3f4a736fcba',
'GroupOwnerId': '436117849909',
'IsEgress': False,
'IpProtocol': 'tcp',
'FromPort': 5439,
'ToPort': 5439,
'CidrIpv4': '0.0.0.0/0'}],
'ResponseMetadata': {'RequestId': '5633f270-1f90-4888-8094-75bcd4fd8e5c',
'HTTPStatusCode': 200,
'HTTPHeaders': {'x-amzn-requestid': '5633f270-1f90-4888-8094-75bcd4fd8e5c',
'cache-control': 'no-cache, no-store',
'strict-transport-security': 'max-age=31536000; includeSubDomains',
'content-type': 'text/xml;charset=UTF-8',
'content-length': '723',
'date': 'Tue, 09 May 2023 07:47:38 GMT',
'server': 'AmazonEC2'},
'RetryAttempts': 0}}
• Go to redshift console and check inside the properties section there you will find Security group option, check whether it is updated.
Script for making tables into Redshift 👇
1. Use Redshift connector library to connect to the redshift cluster
import redshift_connector
conn = redshift_connector .connect (
host = 'covidproject.cyic4klygty9.ap-south-1.redshift.amazonaws.com' ,
database = 'covid-db' ,
user = 'durgesh' ,
password = 'Password123'
)
conn .autocommit = True
cursor = redshift_connector .Cursor = conn .cursor ()
2. Create tables into the Redshift datawarehouse
cursor .execute ("""
CREATE TABLE "factCovid" (
"index" INTEGER,
"fips" REAL,
"province_state" TEXT,
"country_region" TEXT,
"confirmed" REAL ,
"deaths" REAL,
"recovered" REAL,
"active" REAL,
"date" INTEGER,
"positive" REAL,
"negative" REAL,
"hospitalizedcurrently" REAL,
"hospitalized" REAL,
"hospitalizeddischarged" REAL
)
""" )
cursor .execute ("""
CREATE TABLE "dimHospital" (
"index" INTEGER,
"fips" REAL,
"state_name" TEXT,
"latitude" REAL,
"longtitude" REAL,
"hq_address" TEXT,
"hospital_type" TEXT,
"hospital_name" TEXT,
"hq_city" TEXT,
"hq_state" TEXT
)
""" )
cursor .execute ("""
CREATE TABLE "dimRegion" (
"index" INTEGER,
"fips" REAL,
"province_state" TEXT,
"country_region" TEXT,
"latitude" REAL,
"longitude" REAL,
"county" TEXT,
"state" TEXT
)
""" )
cursor .execute ("""
CREATE TABLE "dimDate" (
"index" INTEGER,
"fips" REAL,
"date" TIMESTAMP,
"year" INTEGER,
"month" INTEGER,
"day_of_week" INTEGER
)
""" )
3. Copy data form S3 to the tables
cursor .execute ("""
copy factCovid from 's3://durgesh-covid-project/output/factCovid.csv'
credentials 'aws_iam_role=arn:aws:iam::436117849909:role/redshift_s3_access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""" )
cursor .execute ("""
copy dimHospital from 's3://durgesh-covid-project/output/dimHospital.csv'
credentials 'aws_iam_role=arn:aws:iam::436117849909:role/redshift_s3_access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""" )
cursor .execute ("""
copy dimRegion from 's3://durgesh-covid-project/output/dimRegion.csv'
credentials 'aws_iam_role=arn:aws:iam::436117849909:role/redshift_s3_access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""" )
cursor .execute ("""
copy dimDate from 's3://durgesh-covid-project/output/dimDate.csv'
credentials 'aws_iam_role=arn:aws:iam::436117849909:role/redshift_s3_access'
delimiter ','
region 'ap-south-1'
IGNOREHEADER 1
""" )
Download the .whl file for redshift-connector from
• Upload this file to the packages folder in your S3 bucket, where you are storing query responses .
Create Glue Jobs to run the Script
• Create job with Python Shell script editor
• Enter the location/URI of the libraryfile Glue job will use this library to perform code operations.
• Save and Run the Script
After successful Run you will see below 👇 result
Below Image shows the query is successful in creating the tables inside the Redshift cluster database
• You can now perform analysis on this data and expand further by connecting it with BI tools to create dashboards.
Author: Durgesh Patil
amolsp1999@gmail.com