This repository contains Apache Beam code examples for running on Google Cloud Dataflow. The following examples are contained in this repository:
- Streaming pipeline
- Reading CSVs from a Cloud Storage bucket and streaming the data into BigQuery
- Batch pipeline
- Reading from AWS S3 and writing to Google BigQuery
- Reading from Google Cloud Storage and writing to Google BigQuery
The goal of this example is to overcome the limitations of micro-batching with BigQuery. This exapmle covers the following steps:
- Reads a number of CSV files from Cloud Storage
- Covers the CSV files into a Java Object
- Writes the rows into BigQuery
For more details on the limitations of micro-batching within BigQuery, check out my blog.
- Ensure that you have billing enabled for your project
- Enable the following Google Cloud Platform APIs:
- Cloud Dataflow, Compute Engine, Stackdriver Logging, Google Cloud Storage, Google Cloud Storage JSON, BigQuery, Google Cloud Pub/Sub, Google Cloud Datastore, and Google Cloud Resource Manager APIs.
- Create a Google Cloud Storage bucket to stage your Cloud Dataflow code. Make sure you note the bucket name as you will need it later.
- Create a BigQuery dataset called finance. Keep note of the fully qualified dataset name which is in the format projectName:finance
- Upload the sample_1.csv and sample_2.csv to your Google Cloud Storage bucket
- Validate that the data has been loaded into BigQuery
mvn compile exec:java \
-Dexec.mainClass=com.harland.example.batch.BigQueryImportPipeline \
-Dexec.args="--project=<GCP PROJECT ID> \
--bucketUrl=gs://<GCS BUCKET NAME> \
--bqTableName=<BIGQUERY TABLE e.g. project:finance.transactions> \
--runner=DataflowRunner \
--region=europe-west1 \
--stagingLocation=gs://<DATAFLOW BUCKET>/stage/ \
--tempLocation=gs://<DATAFLOW BUCKET>/temp/"
The goal of the example code is to calculate the total amount transferred for each user_id in the transfers_july.csv. This is purely fictitious example that covers the following steps:
- Reads a CSV file from AWS S3
- Converts the CSV file into a Java Object
- Creates key, value pairs where user_id is the key and amount is the value
- Sums the amount for each user_id
- Writes the result to BigQuery
- Ensure that you have billing enabled for your project
- Enable the following Google Cloud Platform APIs:
- Cloud Dataflow, Compute Engine, Stackdriver Logging, Google Cloud Storage, Google Cloud Storage JSON, BigQuery, Google Cloud Pub/Sub, Google Cloud Datastore, and Google Cloud Resource Manager APIs.
- Create a Google Cloud Storage bucket to stage your Cloud Dataflow code. Make sure you note the bucket name as you will need it later.
- Create a BigQuery dataset called finance. Keep note of the fully qualified dataset name which is in the format projectName:finance
- Upload the transfers_july.csv to your AWS S3/Google Cloud Storage bucket
mvn compile exec:java \
-Dexec.mainClass=com.harland.example.batch.BigQueryImportPipeline \
-Dexec.args="--project=<GCP PROJECT ID> \
--bucketUrl=s3://<S3 BUCKET NAME> \
--awsRegion=eu-west-1 \
--bqTableName=<BIGQUERY TABLE e.g. project:finance.transactions> \
--awsAccessKey=<YOUR ACCESS KEY> \
--awsSecretKey=<YOUR SECRET KEY> \
--runner=DataflowRunner \
--region=europe-west1 \
--stagingLocation=gs://<DATAFLOW BUCKET>/stage/ \
--tempLocation=gs://<DATAFLOW BUCKET>/temp/"
mvn compile exec:java \
-Dexec.mainClass=com.harland.example.batch.BigQueryImportPipeline \
-Dexec.args="--project=<GCP PROJECT ID> \
--bucketUrl=gs://<GCS BUCKET NAME> \
--bqTableName=<BIGQUERY TABLE e.g. project:finance.transactions> \
--runner=DataflowRunner \
--region=europe-west1 \
--stagingLocation=gs://<DATAFLOW BUCKET>/stage/ \
--tempLocation=gs://<DATAFLOW BUCKET>/temp/"
- Java 8
- Maven 3
- Apache Beam 2.5.0