Python ETL Pipeline that pulls crypto transactions data from a Google BigQuery public dataset and stores it locally on an SQLite database. Then, performs summarizations and aggregations, and then stores the data on a LocalStack S3 Bucket.
- Docker Compose
- Apache Airflow
- Python Operators
- Google Cloud BigQuery API
- SQLite Database
- AWS SDK for Python (Boto3)
- Python Operators
- LocalStack
- S3 Bucket
- Apache Airflow
- Google BigQuery API
- SQL Queries
- E-mail notifications on retry/failure
Make sure to have Docker Compose installed and that Docker Engine is running. Also, a way to open .parquet files is recommended (i.e. Tad).
- Clone the repository
git clone https://github.com/Zekda96/transactions-etl-pipeline.git
-
Create a folder named
credentials
in the project directory and place the credentials file there. -
From the project directory, build the container by running
docker-compose up -d
.
[+] Running 8/7
✔ Container localstack Running
✔ Container transactions-etl-postgres-1 Healthy
✔ Container transactions-etl-redis-1 Healthy
✔ Container transactions-etl-airflow-init-1 Exited
✔ Container transactions-etl-airflow-scheduler-1 Running
✔ Container transactions-etl-airflow-triggerer-1 Running
✔ Container transactions-etl-airflow-webserver-1 Running
✔ Container transactions-etl-airflow-worker-1 Started
-
Open the Airflow GUI on http://localhost:8080/ and login with user
airflow
and passwordairflow
. If DAGzilliqa-transactions-DAG
is paused, please unpause it and it should shortly start executing. -
Once the run has been marked with the
success
tag, you can check the S3 Bucket on http://localhost:4566/zilliqa-transactions. It should have two .parquet files, each specified under a<Contents>
tag:
# ...
<Contents>
<Key>transactions_summary.parquet</Key>
# ...
# ...
<Contents>
<Key>receivers_summary.parquet</Key>
- Parquet files can be downloaded from:
- http://localhost:4566/zilliqa-transactions/transactions_summary.parquet
- http://localhost:4566/zilliqa-transactions/receivers_summary.parquet
- The container can be terminated by running
docker-compose down
.
The pipeline extracts 1.3M rows of data from a crypto transactions table via Google BigQuery's API. The data is transformed using SQL queries that performed summarizations in the following form:
Table 1
- Daily metrics (2022-08-01 to 2022-09-07)
- Number of transactions
- Successful transactions
- Success rate
- Number of unique senders
- Number of unique receivers
- Average cost of gas
- Average amount sent
Table 2
- All addresses ordered by most transactions received (2022-08-01 to 2022-09-07)
- Number of transactions
- Average cost of gas
- Total amount received
The pipeline is capable of handling the load of the entire database from BigQuery, however, for the purpose of this project, it has been reduced to all recorded transactions later than 2022-08-01, which reduces the data pulled from 41M rows to 1.3M rows. This allows the 'extract_data' task to run in around 3 minutes, instead of the approximately 120 minutes it would take to load the entire dataset.
In order to pull the whole dataset, line 134
from extract_data.py
can be
commented out.
WHERE DATE(block_timestamp) >= DATE(2022,08,01)
Actions performed in this pipeline such as storing a file locally or using LocalStack are restricted to development environments and have to be replaced for production. The project could be scaled and used in production by using modular cloud infrastructure services such as Data Lakes (AWS S3, Azure Blob Storage, Google Cloud Storage) Data Warehouses (Redshift, Databricks, Google BigQuery) and serverless FaaS (AWS Lambda, Google Cloud Functions).