This repository for course Data Product II in Partnatech. In this course use tech stack :
- Data Ingestion : Airbyte
- Data Transformation : dbt
- Workflow Orchestions : Apache Airflow
- Data Platform : OLTP(PostgreSQL) & OLAP(BigQuery)
- Already installed docker and docker compose
- Already have service account key json file for Google BigQuery
- Already installed postgresql in local
- Already installed vscode or other IDE
- ELT Data Architecture & Data Modelling
- Setup Airbyte in docker
- Define source connection in Airbyte
- Define destination connection in Airbyte
- Configure connection in Airbyte
- Create Airbyte Connection in Airflow Web Server
- Create airflow dags for trigger Airbyte job
- Create dbt model
- Create dags for integration airbyte, dbt and apache airflow
In this course, for Data Modelling use Kimbal's Method - Star Schema
- git clone this code to local
- run docker compose
docker compose up -d
- If success then open url http://localhost:8000 for Airbyte UI
- Click New Connection
- In Define source, choose setup new source
- Input csv in search text box then click File
- Input dataset name and choose file format csv
- For Storage Provider choose HTTPS : Public Web and input URL : https://storage.googleapis.com/raw_data_alterra_batch2_2024/customer.csv
- Click set up source
- In Define destination, choose setup new destination
- Input postgre in search text box then click Postgres
- Input destination name
- Input host : host.docker.internal if postgres db installed in local computer not in docker
- Input port : 5432
- Input DB name
- Input User and password
- Click Setup destination
- In Connection, input connection name
- In Configuration, Choose schedule type manual(because airbyte job will trigger by airflow)
- Click Setup connection
- Click Sync Now
- Click Admin --> Connections
- Input connection id
- Choose connection type : Airbyte
- Input host : airbyte-server
- Inport port : 8001
- Click Test
- Click Save
- Go to Airbyte UI then copy connection id for each connection. e.g : b1016cab-07de-499c-84f2-abfc1abdf819
- Copy paste code bellow :
from datetime import datetime
from airflow.decorators import dag
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
@dag(
start_date=datetime(2024,5,14),
schedule='@daily',
catchup=False,
tags=['airbyte','airflow'],
)
def dataIngestion():
csv_to_postgres = AirbyteTriggerSyncOperator(
task_id='ingest_csv_to_postgres',
airbyte_conn_id='airbyte_conn',
connection_id='bb315ec0-86c6-4db8-acde-6b655b80be75',
asynchronous=False,
timeout=3600,
wait_seconds=3
)
postgres_to_bigquery = AirbyteTriggerSyncOperator(
task_id='ingest_csv_to_postgres',
airbyte_conn_id='airbyte_conn',
connection_id='b1016cab-07de-499c-84f2-abfc1abdf819',
asynchronous=False,
timeout=3600,
wait_seconds=3
)
csv_to_postgres >> postgres_to_bigquery
dataIngestion()
- Change connection_id in AirbyteTriggerSyncOperator based on previous step
- Enable airflow dag then check data in postgres or BigQuery
- Copy paste all files in model folder
- Go to Airbyte UI then copy connection id for each connection. e.g : b1016cab-07de-499c-84f2-abfc1abdf819
- Copy paste dags code elt_datapipelines.py in folder dags to your local folder
- Enable airflow dag then check data in postgres or BigQuery