Docker is a containerization software that allows us to isolate software in a similar way to virtual machines but in a much leaner way.
A Docker image is a snapshot of a container that we can define to run our software, or in this case our data pipelines. By exporting our Docker images to Cloud providers such as Amazon Web Services or Google Cloud Platform we can run our containers there.
Docker provides the following advantages:
- Reproducibility
- Local experimentation
- Integration tests (CI/CD)
- Running pipelines on the cloud (AWS Batch, Kubernetes jobs)
- Spark (analytics engine for large-scale data processing)
- Serverless (AWS Lambda, Google functions)
- Docker containers are stateless: any changes done inside a container will NOT be saved when the container is killed and started again.
This is an advantage because it allows us to restore any container to its initial state in a reproducible manner, but you will have to store data elsewhere if you need to do so; a common way to do so is with volumes.
Installing Docker Watch any video online and install Docker for Ubuntu
Checking if Docker is Installed Correctly
docker run hello-world
Run linux commands in docker bash
docker run -it ubuntu bash
Here we can run any command like ls
Running Python in Docker
docker run -it python:3.9
Installing Pandas
- Defining the entry point as bash
docker run -it --entrypoint=bash python:3.9
Run the following
pip install pandas
Pandas is installed only in sppecific docker container. Note : ctrl + d to exit python interactive bash
Problem : When we exit bash and run import pandas
nothing happens and pandas is gone as the python:3.9 docker container doesnt save the state
To solve this problem:
- Create a Dockerfile[its in week-1 folder]
FROM python:3.9
selects image as python 3.9RUN pip install pandas
runs the commandENTRYPOINT bash
selects the entry point as bash
Now use docker build -t test:pandas .
- docker build helps build the image
- -t is used tospecify tags
- test is the name of the image
- pandas is the version
- . tells docker to go to the folder which has Dockerfile, navigate its path and execute it
Now if we do docker run -it test:pandas
then we can use import pandas
Add these two to Dockerfile
WORKDIR /app
COPY pipelin.py pipeline.py
- WORKDIR specifies where the pipeline.py file must be put into
- COPY specifies the source path and the destination path inside app dir.
Now run docker build -t test:pandas .
and docker run -it test:pandas
The working directory would be /app
now!
From here we can run python pileine.py
and job finished is printer on screen.
pipeline.py
import pandas as pd
# fancy pandas stuff
print(sys.argv)
day = sys.argv[1]
print(f"job finished successfully for the day = f{day}")
dockerfile
FROM python:3.9
RUN pip install pandas
WORKDIR /app
COPY pipeline.py pipeline.py
ENTRYPOINT ["python","pipeline.py"]
Now run docker build -t test:pandas .
Then finally execute like this
docker run -it test:pandas 2021-02-15
Output would be:
job finished successfully for the day = f2021-02-15
1.2.2 Ingesting NY Taxi Data to Postgres
To download the data that we are going to use(NYC Taxi Cab Dataset) us the below command:
wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz
Running Postgres from Docker
docker run -it \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB='ny_taxi' \
--volume //DRIVELETTER/INSERTPATHHERE/ny_taxi_postgres_data:/var/lib/postgresql/data \
-p 5432:5432 \
postgres:13
Meaning of the code :
-e -> a tag that allows us to config stuff
--volume -> the dataset path that needs to be inserted in the postgres db
-p -> port on which postgres should run
postgres:13 -> version of postgres
At this point a directory ny_taxi_postgres_data must be created. Do not worry if its empty, as long as the command runs successfully, everything is fine
Now at this point we have successfully connected postgres image with docker!
There must be a way to query the database in postgres right? Here's where pgcli
comes into the picture giving us ability to write sql commands in the cli.
Working with pgcli
- Installing
pgcli
Use the following command:
pip install pgcli
- Use
pgcli
to connect to postgres
pgcli -h localhost -p 5432 -u root -d ny_taxi
- Some commands you can test to check whether cli works or not
\dt
-> should show the tables listSELECT COUNT(1) FROM nyc_taxi_data
Problem with postgrescli
The major problem with pgcli is that its just a command line interface for executing simple queries in test, but it would be great if we could have a clean GUI right? Here is where pgadmin
comes into play.
pgAdmin provides a clear GUI for postgres data querying.
installing pgAdmin
Let's install pgadmin by creating the docker image for it.
docker run -it \
-e PGADMIN_DEFAULT_EMAIL="admin@admin.com" \
-e PGADMIN_DEFAULT_PASSWORD="root" \
-p 8080:80 \
dpage/pgadmin4
Here we provide an email and password for access creds and specify the port.
Is your pgadmin dashboard loading slowly?
If this is the case, change dpage/pgadmin4
to dpage/pgadmin3
and it loads faster.
Creating Server on pgAdmin
When we try to create server as shown in the video, we see that there is an error because our pgAdmin exists in one container and postgres is in another container. There is no connection between them.
Using Docker Networks, we can put two or more images in one Docker Container and run everything smoothly. Let's do that now.
- Creating a docker network
docker network create pg-network
- Adding postgres to our network
docker run -it \
-e POSTGRES_USER="root" \
-e POSTGRES_PASSWORD="root" \
-e POSTGRES_DB='ny_taxi' \
--volume //DRIVELETTER/INSERTPATHHERE/ny_taxi_postgres_data:/var/lib/postgresql/data \
-p 5432:5432 \
--network=pgnetwork \
--name pg-database \
postgres:13
- Adding pgAdmin to out network
docker run -it \
-e PGADMIN_DEFAULT_EMAIL="admin@admin.com" \
-e PGADMIN_DEFAULT_PASSWORD="root" \
-p 8080:80 \
--network=pg-network \
--name pgadmin-2 \
dpage/pgadmin4
Now everythin should be set and you should be able to create server in pgAdmin.
Let's now convert our ipynb file to a neat python pipeline.
First go to pgAdmin dash board and drop the existing tables.
DROP TABLE nyc_cab_data;
ingest-data.py
- Importing Dependencies
import os
import argparse
from time import time
import pandas as pd
from sqlalchemy import create_engine
argparse
is used to parse the arguments from the command line.
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Ingest CSV data to Postgres')
parser.add_argument('--user', required=True, help='user name for postgres')
parser.add_argument('--password', required=True, help='password for postgres')
parser.add_argument('--host', required=True, help='host for postgres')
parser.add_argument('--port', required=True, help='port for postgres')
parser.add_argument('--db', required=True, help='database name for postgres')
parser.add_argument('--table_name', required=True, help='name of the table where we will write the results to')
parser.add_argument('--url', required=True, help='url of the csv file')
args = parser.parse_args()
main(args)
In the above code we specify the arguments that the user can enter. Then we call the parser.parse_args()
to collect all the args in one array and then pass them to the main
function as an array.
Check the ingest-data.py
script to know more about main function.
Now we must get back the dataset on pgAdmin Dashboard.
Step 1 : Migrate update-data.ipynb
to ingest-data.py
script.
Step 2 : Understanding ingest-data.py
script
import os
import argparse
from time import time
import pandas as pd
from sqlalchemy import create_engine
Basically importing basic stuff. os
is used for file operations. argparse
is used to define command line arguments.
def main(params):
user = params.user
password = params.password
host = params.host
port = params.port
db = params.db
table_name = params.table_name
url = params.url
Here we store the command line arguments in variables to be used to fetched from the csv file.
# the backup files are gzipped, and it's important to keep the correct extension
# for pandas to be able to open the file
if url.endswith('.csv.gz'):
csv_name = 'output.csv.gz'
else:
csv_name = 'output.csv'
os.system(f"wget {url} -O {csv_name}")
Here we get the download the csv data using wget command.
engine = create_engine(f'postgresql://{user}:{password}@{host}:{port}/{db}')
df_iter = pd.read_csv(csv_name, iterator=True, chunksize=100000)
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.head(n=0).to_sql(name=table_name, con=engine, if_exists='replace')
df.to_sql(name=table_name, con=engine, if_exists='append')
Here we create an sql engine with the username, password and host. Then the data is read in iterations and a few dtype changes are made. Finally the data is added to postgres db.
while True:
try:
t_start = time()
df = next(df_iter)
df.tpep_pickup_datetime = pd.to_datetime(df.tpep_pickup_datetime)
df.tpep_dropoff_datetime = pd.to_datetime(df.tpep_dropoff_datetime)
df.to_sql(name=table_name, con=engine, if_exists='append')
t_end = time()
print('inserted another chunk, took %.3f second' % (t_end - t_start))
except StopIteration:
print("Finished ingesting data into the postgres database")
break
Above code inserts data chunk by chunk in iterations of 10000 each time and then Stops Iterating.
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Ingest CSV data to Postgres')
parser.add_argument('--user', required=True, help='user name for postgres')
parser.add_argument('--password', required=True, help='password for postgres')
parser.add_argument('--host', required=True, help='host for postgres')
parser.add_argument('--port', required=True, help='port for postgres')
parser.add_argument('--db', required=True, help='database name for postgres')
parser.add_argument('--table_name', required=True, help='name of the table where we will write the results to')
parser.add_argument('--url', required=True, help='url of the csv file')
args = parser.parse_args()
main(args)
Above is the main functio used to define command line arguments with their helper description text.
ArgumentParser
is used to define the argument parser object
parse_args
stores the arguments in an array and then this is passed to the main function as params.
Modifying Dockerfile
RUN apt-get install wget
RUN pip install pandas sqlalchemy psycopg2
WORKDIR /app
COPY ingest-data.py ingest-data.py
ENTRYPOINT [ "python" , "ingest-data.py" ]
Step 1 : we install wget and sqlalchemy
Step 2 : we create file ingest-data.py
Step 3 : we specify entrypoint as python ingest-data.py
Running ingest data flow
- Specifying the params with docker iterative mode
URL="https://github.com/DataTalksClub/nyc-tlc-data/releases/download/yellow/yellow_tripdata_2021-01.csv.gz"
docker run -it \
--network=pg-network \
taxi_ingest:v001 \
--user=root \
--password=root \
--host=pg-database \
--port=5432 \
--db=ny_taxi \
--table_name=yellow_taxi_trips \
--url=${URL}
- Running build script
docker build -t taxi_ingest:v001 .
Docker Compose Networking
services:
pgdatabase:
image: postgres:13
environment:
- POSTGRES_USER=root
- POSTGRES_PASSWORD=root
- POSTGRES_DB=ny_taxi
volumes:
- "./data/ny_taxi_postgres_data:/var/lib/postgresql/data:rw"
ports:
- "5432:5432"
pgadmin:
image: dpage/pgadmin4
environment:
- PGADMIN_DEFAULT_EMAIL=admin@admin.com
- PGADMIN_DEFAULT_PASSWORD=root
ports:
- "8080:80"
Running docker compose yaml
docker compose up
-- check if the table is there
SELECT COUNT(1) FROM zones;
-- join zones and nyc_taxi_data
SELECT
tpep_pickup_datetime,
tpep_dropoff_datetime,
total_amount,
CONCAT(zpu."Borough",' / ',zpu."Zone")AS "pick_up_loc",
CONCAT(zdo."Borough" ,' / ' ,zdo."Zone") AS "dropoff_loc"
FROM
yellow_taxi_data t,
zones zpu,
zones zdo
WHERE
t."PULocationID" = zpu."LocationID" AND
t."DOLocationID" = zdo."LocationID"
LIMIT 100;
-- inner join
SELECT
tpep_pickup_datetime,
tpep_dropoff_datetime,
total_amount,
CONCAT(zpu."Borough",' / ',zpu."Zone")AS "pick_up_loc",
CONCAT(zdo."Borough" ,' / ' ,zdo."Zone") AS "dropoff_loc"
FROM
yellow_taxi_data t JOIN zones zpu
ON t."PULocationID" = zpu."LocationID"
JOIN zones zdo
ON t."DOLocationID" = zdo."LocationID"
LIMIT 100;
-- check if pickup location id is null
SELECT
tpep_pickup_datetime,
tpep_dropoff_datetime,
total_amount,
"PULocationID",
"DOLocationID"
FROM
yellow_taxi_data t
WHERE
"PULocationID" is NULL
-- drop off location ids in trips db but not zones db
SELECT
tpep_pickup_datetime,
tpep_dropoff_datetime,
total_amount,
"PULocationID",
"DOLocationID"
FROM
yellow_taxi_data t
WHERE
"DOLocationID" NOT IN (
SELECT "LocationID" FROM zones)
LIMIT 100;
-- other types of joins
-- left join - display records on left table but not on right
SELECT
pickup_loc,
dropoff_loc,
tpep_pickup_datetime,
tpep_dropoff_datetime,
total_amount,
"PULocationID",
"DOLocationID"
FROM
yellow_taxi_data t LEFT JOIN zones zpu
ON t."PULocationID" = zpu."LocationID"
LEFT JOIN zones zdo
ON t."DOLocationID" = zdo."LocationID"
LIMIT 100;
-- groupby and aggregates
-- calculate no of trips per day
SELECT
CAST(tpep_dropoff_datetime AS DATE) AS "Day",
COUNT(1)
FROM
yellow_taxi_data t
GROUP BY
CAST(tpep_dropoff_datetime AS DATE)
ORDER BY "Day" ASC;
-- day with largest number of records
SELECT
CAST(tpep_dropoff_datetime AS DATE) AS "Day",
COUNT(1) as "count"
FROM
yellow_taxi_data t
GROUP BY
CAST(tpep_dropoff_datetime AS DATE)
ORDER BY "count" DESC;
-- max amount of mney made by driver
SELECT
CAST(tpep_dropoff_datetime AS DATE) AS "Day",
COUNT(1) as "count",
MAX(total_amount),
MAX(passenger_count)
FROM
yellow_taxi_data t
GROUP BY
CAST(tpep_dropoff_datetime AS DATE)
ORDER BY "count" DESC;
-- group by multiple fields
SELECT
CAST(tpep_dropoff_datetime AS DATE) AS "Day",
"DOLocationID",
COUNT(1) as "count",
MAX(total_amount),
MAX(passenger_count)
FROM
yellow_taxi_data t
GROUP BY
1,2
ORDER BY
"Day" ASC,
"DOLocationID" ASC;
export GOOGLE_APPLICATION_CREDENTIALS="</home/vedanth/dataEngineering/week-1/premium-bloom-368403-091ed03452fd.json"