Previously on the following Azure Databricks - Data Pipeline article I present how to build and end to end data pipeline using Azure Databricks and Data Factory. Now I'm doing a similar implementation using an open-source stack leveraging some of the best existing solutions:
- Airbyte: data extraction - http://localhost:56174/
- Dbt: data transformation
- Airflow: task orchestration - http://localhost:8085/
- PostgreSQL: data storage
- Openmetadata: data catalog - http://localhost:8585/
- Minio: object storage - http://localhost:9090/
We will be working with transactional data referred to loan transactions and customers from GeekBankPE (a famous bank around the world).
You have two requirements from different areas of the bank.
- The Marketing area needs to have updated customer data to be able to contact them and make offers.
- The Finance area requires to have daily loan transactions complemented with customer drivers to be able to analyze them and improve the revenue.
To fulfill with the request, we are going to perform incremental loads and also using techniques like upsert.
- Docker Desktop
- Docker compose
- Windows 10 or higher (This project hasn't been tested on MacOS or Linux)
- Python 3.6 or higher
- Recommended 8GB RAM or higher only for docker containers.
- username:
airflow
- password:
airflow
Minio acces and secret keys along with url_endpoint are used to log in and for connections.
- minio_access_key:
minio_admin
- minio_secret_key:
minio_password
- url_endpoint: http://host.docker.internal:9000
- port: 9000
- username:
dwh
- password:
dwh
- external_host:
local_host
- external_port: 5455
- internal_host:
postgres_dwh
- interal_port:
5432
If you connect to your database through Mysql or DBeaver, you need to use the external host and port.
Enter a valid email when trying to log in.
- For other configurations:
- internal_host:
host.docker.internal
- internal_host_and_port:
http://host.docker.internal:8000
- user:
airbyte
- password:
password
- username:
admin
- password:
admin
In the below youtube video, I show you how to set up everything.
- Open your terminal.
- Navigate to the root of the
os-data-stack
repository - Run
docker compose up --build
to initialize the containers. If you want to run it in the background, add-d
argument. - Perform Airflow configurations (Section below)
- Run the Airflow DAG called
upload_data_to_s3
for uploading csv files into minio s3 bucket. - Perform Airbyte configurations (Section below)
- Run the Airflow DAG called
dbt_model
to create the tables in the bronze, silver, and gold schemas. - Perform Openmetadata configurations. (Section below)
- Play around all the technologies.
- Open Airflow
- Go to connections
- Create the Minio S3 connection with the below configuration:
If you test this connection it will fail, just ignore it.
- Connection Type:
Amazon Web Services
- Connection Id:
aws_default
- Extra:
{"aws_access_key_id": "minio_admin", "aws_secret_access_key": "minio_password", "endpoint_url": "http://host.docker.internal:9000"}
- Connection Type:
- Create the Postgres connection
- Connection Type:
Postgres
- Connection Id:
postgres_default
- Host:
postgres_dwh
- Schema:
dwh
- Login:
dwh
- Password:
dwh
- Port:
5432
- Connection Type:
- Create the Airbyte connection (Optional, in case you want to use the stack for your own development)
- Connection Type:
Airbyte
- Connection Id:
airbyte_default
- Host:
host.docker.internal
- Username:
airbyte
- Password:
password
- Port:
8000
- Connection Type:
- Open Airbyte, enter an email and select
Get started
- Select sources (left sidebar) , in the search bar write
S3
and select it - Create the S3 connection for customer data
- Source_name:
S3_customer_information_cdc
- Output_stream_name:
daily_customer_information_cdc
- Pattern_of_files_to_replicate:
customer/*.csv
- Bucket:
raw
- Aws_access_key_id:
minio_admin
- Aws_secret_access_key:
minio_password
- Path_prefix:
customer/
- Endpoint:
http://host.docker.internal:9000
- Scroll until the end and select
set up source
- Source_name:
- Create the S3 connection for customer driver data
- Source_name:
S3_daily_customer_drivers
- Output_stream_name:
daily_customer_drivers
- Pattern_of_files_to_replicate:
customerDrivers/*.csv
- Bucket:
raw
- Aws_access_key_id:
minio_admin
- Aws_secret_access_key:
minio_password
- Path_prefix:
customerDrivers/
- Endpoint:
http://host.docker.internal:9000
- Scroll until the end and select
set up source
- Source_name:
- Create the S3 connection for loan transactions data
- Source_name:
S3_daily_loan_transactions
- Output_stream_name:
daily_loan_transactions
- Pattern_of_files_to_replicate:
transactions/*.csv
- Bucket:
raw
- Aws_access_key_id:
minio_admin
- Aws_secret_access_key:
minio_password
- Path_prefix:
transactions/
- Endpoint:
http://host.docker.internal:9000
- Scroll until the end and select
set up source
- Source_name:
- Select Destinations (left sidebar), search for Postgres and select it.
- Create the Postgres connection as destination
- Destination_name:
Postgres_DWH
- Host:
localhost
- Port:
5455
- Db_name:
dwh
- Default_Schema:
airbyte
- user:
dwh
- password:
dwh
- Scroll until the end and select
set up destination
- Destination_name:
- Select Connections (left sidebar)
- Select the
S3_customer_information_cdc
source - Select
Use existing destination
- In the destination tab select Postgres_DWH and select
Use existing destination
- In the new screen view, change the
Replication frequency
toManual
- Sync mode should be
Full refresh overwrite
(2023-05-01 - Incremental sync mode isn't working, data gets duplicated when using it, maybe because the Postgres connector is in Alpha) - Select
set up connection
- Repeat the same process for
S3_daily_customer_drivers
andS3_daily_loan_transactions
sources.
- Select the
- After setting up the connections, select Connections (left side bar) and click on
Sync now
for your 3 connections. - Wait until the syncronization finished.
- Open openmetadata, enter your credentials
- username:
admin
- password:
admin
- username:
- Select
Services
in the left sidebar. - Select
Add New Service
in the right top corner. - Create the Postgres database service.
- Select Database Services.
- Select the Postgres connector and select
Next
- Enter
postgres_con
as service name and selectNext
- Fill out the fields for the connection:
- Username:
dwh
- Password:
dwh
- Host_and_Port:
postgres_dwh
- Database:
dwh
- Username:
- Test your connection and select
save
- Select your connection.
- Select the tab called
Ingestions
- Create a metadata ingestion by selecting the
Add ingestion
purple button on the right Mandatory- Enable
Use FQN For Filtering
- Enable
Include Views
- Select
Next
- Schedule interval, select
None
, which means Manual. - Finally select
Add & Deploy
and then View Service
- Run the metadata ingestion.
- Enable
- Create a DBT Ingestion Mandatory (For our example)
- dbt_Configuration_Source:
S3_Config_Source
- AWS_Access_Key_ID:
minio_admin
- AWS_Secret_Access_Key:
minio_password
- AWS_Region:
us-east-1
- Endpoint_URL:
http://host.docker.internal:9000
- dbt_Bucket_Name:
dbt
- dbt_Object_Prefix:
dwh
- Select
Next
, choose aManual
schedule interval. - Run the DBT ingestion
- dbt_Configuration_Source:
- Create Lineage Ingestion Optional
- Create a Profiler Ingestion Optional
- Recommended to filter schema and target only:
bronze,silver,gold
- Recommended to play with the value of
Profile Sample Type
- Recommended to filter schema and target only:
- Create a Usage Ingestion Optional
- Select
Services
in the left sidebar. - Select
Add New Service
in the right top corner. - Select
Pipeline Services
from the drop down list. - Select
Airflow
and thenNext
- Enter a name for your service connection and select
Next
- Enter the below configuration:
- Host_and_Port:
http://localhost:8085/
- Metadata_Database_Connection:
PostgresConnection
- Username:
airflow
- Password:
airflow
- Host_and_Port:
host.docker.internal:5432
- Database:
airflow
- Host_and_Port:
- Test your connection and save.
- Navigate to your airflow service connection and create a metadata ingestion.
- Run the metadata ingestion.
- Select
Services
in the left sidebar. - Select
Add New Service
in the right top corner. - Select
Pipeline Services
from the drop down list. - Select
Airbyte
and thenNext
- Enter a name for your service connection and select
Next
- Enter the below configuration:
- Host_and_Port:
http://host.docker.internal:8000
- Metadata_Database_Connection:
PostgresConnection
- Username:
airbyte
- Password:
password
- Host_and_Port:
- Test your connection and save.
- Navigate to your airflow service connection and create a metadata ingestion.
- Run the metadata ingestion.
In case you want to do some dbt actions
- Navigate to the root of the
dbt
directory - Install a python virtual environment by running:
- Windows:
python -m venv env
- Windows:
- Activate your environment by running:
- Windows:
env/scripts/activate
- Windows:
- Run the below command to install dbt:
- Windows:
pip install dbt-postgres==1.4.6
- Windows:
- Navigate to the
dwh
project by running:- cd
dwh
- cd
- Now you are able to run dbt commands, follow example below:
dbt build --profiles-dir "../" --target prod_localhost --vars '{ target_date: 2022-09-12 }' --select gold
dbt test --profiles-dir "../" --target prod_localhost --vars '{ target_date: 2022-09-12 }' --select gold
Note: Some of the tasks will be marked asERROR
when running a DBT command because data is already loaded.