DataflowToCloudSQL
is a Python-based project that integrates Apache Beam pipelines with Google Cloud SQL.
The project is structured to allow reading data from a GCS bucket, and writing the results to a GCP CloudSQL Postgres
database. The primary aim of this repo is to demonstrate connecting to CloudSQL from Dataflow :
- Using a private IP
- Using IAM authentication
- Without a CloudSQL Auth Proxy
The solution uses SQLAlchemy's bulk loading capabilities in conjunction with Beam's distributed framework.
-
common/: Contains common utility modules used across the project.
config.ini
: Configuration file for storing database connection details and other dataflow configurations.get_connection.py
: Module for establishing a connection to Google Cloud SQL.Logger.py
: Module for logging throughout the application.parse_configs.py
: Handles the parsing of the configuration file (config.ini
).utils.py
: Additional utility functions used across the project.
-
pipelines/: Directory intended for storing Apache Beam pipeline definitions.
-
sinks/: Directory intended for modules related to data sinks, such as CloudSQL
-
sources/: Directory intended for modules related to data sources.
read_from_source.py
: Module for reading data from the specified source. This is a sample record generator to demonstrate the CloudSQL write capability
-
Dockerfile: Docker configuration for containerizing the application.
-
main.py: The main entry point of the application that initializes and runs the Apache Beam pipeline.
-
requirements.txt: Python dependencies required for the project.
-
setup.py: Script for installing the project and its dependencies.
The application uses a config.ini
file located in the common/
directory for configuration. This file should contain necessary details such as database credentials, connection strings, and other configurable parameters.
Example config.ini
:
[cloudsql]
instance=project:region:cloudsql-instance
database=postgres
schema=public
user=dataflow-sa@project_name.iam
password=''
batch=10000
[dataflow]
project=project_name
temp_location=""
staging_location=""
region=us-central1
service_account=dataflow-sa@project_name.iam.gserviceaccount.com
[source]
src_path=gs://bucket_name/df_big/output*
Copy the config.ini to a GCS path
gcloud storage cp common/config.ini gs://bucket_name/dataflowtocsql/config/config.ini
To build image using gcloud builds
-
gcloud builds submit --tag us-central1-docker.pkg.dev/project_name/repo_name/dataflow/dataflow2csql:1.0
Run the job with DataflowRunner -
python main.py --config_path="gs://bucket_name/dataflowtocsql/config/config.ini" --runner=DataflowRunner --sdk_container_image="us-central1-docker.pkg.dev/project_name/repo_name/dataflow/dataflow2csql:2.0" --setup_file=./setup.py