Study notes for Data Engineer – Professional Certification Preparation for Google
- Download the gcloud SDK
- Activate your project
gcloud init
orgcloud config configurations activate [profile name]
- See Managing SDK Configurations and All How-to Guides
- The gcloud config files are located under
~/.config/
by default.
gcloud auth list
gcloud config list
gcloud info
gcloud help
gcloud help compute instances create
- https://www.coursera.org/specializations/gcp-data-machine-learning
- https://www.coursera.org/learn/serverless-data-analysis-bigquery-cloud-dataflow-gcp
- https://github.com/GoogleCloudPlatform/training-data-analyst
This file contains text you can copy and paste for the examples in Cloud Academy's Introduction to Google Cloud Dataflow course.
See Apache Beam Java SDK Quickstart for additional tutorials.
Note: Creating a gcloud service account is required to run locally for gs
buckets.
See Before you Begin for steps to follow to create a service account, assign it a role, and download the key.
Set the environment variable GOOGLE_APPLICATION_CREDENTIALS
to the file path of the JSON file that contains your
service account key. This variable only applies to your current shell session, so if you open a new session, set the
variable again.
gcloud auth application-default login
export GOOGLE_APPLICATION_CREDENTIALS="~/.config/gcloud/application_default_credentials.json"
See Setting up a Java Development Environment for Apache Beam on Google Cloud Platform.
Taken from slides for Learn Stream processing with Apache Beam
-
A pipeline is a ...
- A Directed Acyclic Graph (DAG) of data transformations
- Possibly unbounded collections of data flow on the edges
- May include multiple sources and multiple sinks
- Optimized and executed as a unit
-
A pipeline describes ...
- What are you computing?
- Where in event time?
- When in processing time?
- How do refinements relate?
-
A pipeline describes ...
- What = Transformations
- Where = Windowing
- When = Watermarks + Triggers
- How = Accumulation
Installing on your own computer: https://cloud.google.com/dataflow/docs/quickstarts
Transforms: https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/transforms/package-summary.html
git clone https://github.com/cloudacademy/beam.git
cd beam/examples/java8
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalLineCount
gsutil cat gs://dataflow-samples/shakespeare/kinglear.txt | wc
Setup Project and Bucket environment variables...
export PROJECT=$(gcloud config get-value core/project)
export BUCKET=gs://dataflow-$PROJECT
# or ...
nano ~/.profile
PROJECT=[Your Project ID]
BUCKET=gs://dataflow-$PROJECT
Create a Google Storage bucket...
gsutil mb $BUCKET
cd ~/beam/examples/java8
Run the MinimalLineCount example:
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalLineCountArgs \
-Dexec.args="--runner=DataflowRunner \
--project=$PROJECT \
--tempLocation=$BUCKET/temp"
Run the line LineCount example:
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.LineCount \
-Dexec.args="--runner=DataflowRunner \
--project=$PROJECT \
--tempLocation=$BUCKET/temp \
--output=$BUCKET/linecount"
cd ~/beam/examples/java8
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.MinimalWordCount \
-Dexec.args="--runner=DataflowRunner \
--project=$PROJECT \
--tempLocation=$BUCKET/temp \
--output=$BUCKET/wordcounts"
cd ~/beam/examples/java8
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.complete.game.UserScore \
-Dexec.args="--runner=DataflowRunner \
--project=$PROJECT \
--tempLocation=$BUCKET/temp/ \
--output=$BUCKET/scores"
cd ~/beam/examples/java8
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.complete.game.HourlyTeamScore \
-Dexec.args="--runner=DataflowRunner \
--project=$PROJECT \
--tempLocation=$BUCKET/temp/ \
--output=$BUCKET/scores \
--startMin=2015-11-16-16-00 \
--stopMin=2015-11-17-16-00"
bq mk game
API Console Credentials: https://console.developers.google.com/projectselector/apis/credentials
export GOOGLE_APPLICATION_CREDENTIALS="[Path]/[Credentials file]"
cd ~/beam/examples/java8
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.complete.game.injector.Injector \
-Dexec.args="$PROJECT game none"
cd ~/beam/examples/java8
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.complete.game.LeaderBoard \
-Dexec.args="--runner=DataflowRunner \
--project=$PROJECT \
--tempLocation=$BUCKET/temp/ \
--output=$BUCKET/leaderboard \
--dataset=game \
--topic=projects/$PROJECT/topics/game"
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=2.9.0 \
-DgroupId=com.russomi \
-DartifactId=word-count-beam \
-Dversion="0.1" \
-Dpackage=com.russomi.beam.examples \
-DinteractiveMode=false
mvn compile exec:java -Dexec.mainClass=com.russomi.beam.examples.WordCount \
-Dexec.args="--inputFile=pom.xml --output=counts" -Pdirect-runner
See the Java Quickstart for more.
Cloud Dataflow templates allow you to stage your pipelines on Cloud Storage and execute them from a variety of environments. You can use one of the Google-provided templates or create your own.
Templates provide you with additional benefits compared to traditional Cloud Dataflow deployment, such as:
* Pipeline execution does not require you to recompile your code every time.
* You can execute your pipelines without the development environment and associated dependencies that are common with traditional deployment. This is useful for scheduling recurring batch jobs.
* Runtime parameters allow you to customize the execution of the pipeline.
* Non-technical users can execute templates with the Google Cloud Platform Console, gcloud command-line tool, or the REST API.
Cloud Dataflow templates use runtime parameters to accept values that are only available during pipeline execution. To customize the execution of a templated pipeline, you can pass these parameters to functions that run within the pipeline (such as a
DoFn
).
mvn compile exec:java \
-Dexec.mainClass=com.example.myclass \
-Dexec.args="--runner=DataflowRunner \
--project=[YOUR_PROJECT_ID] \
--stagingLocation=gs://[YOUR_BUCKET_NAME]/staging \
--templateLocation=gs://[YOUR_BUCKET_NAME]/templates/MyTemplate"
This example creates a batch job from a template that reads a text file and writes an output text file.
gcloud dataflow jobs run [JOB_NAME] \
--gcs-location gs://[YOUR_BUCKET_NAME]/templates/MyTemplate \
--parameters inputFile=gs://[YOUR_BUCKET_NAME]/input/my_input.txt,outputFile=gs://[YOUR_BUCKET_NAME]/output/my_output
Google provides a set of open-source Cloud Dataflow templates.
- Setting up a Java Development Environment for Apache Beam on Google Cloud Platform
- Dataflow Templates
- Data Engineer – Professional Certification Preparation for Google
- cloudacademy/beam
- Learning Path: Apache Beam for Stream Processing
- Learn stream processing with Apache Beam - Strata 2017
- Tutorial - Learn stream processing with Apache Beam - Strata 2017
- Advancing Serverless Data Processing in Cloud Dataflow (Cloud Next '18)
- Apache Beam and Google Cloud Dataflow
- Performing ETL from a Relational Database into BigQuery
- GoogleCloudPlatform/bigquery-etl-dataflow-sample
- Quickstart Using Java and Apache Maven
- All Google Cloud Tutorials
- jorwalk/data-engineering-gcp
Google Composer (Apache Airflow) Notes
- Composer Quickstart: https://cloud.google.com/composer/docs/quickstart
- Apache Airflow: http://airflow.apache.org/
- Concepts: https://airflow.apache.org/concepts.html (Good one for overall concepts of airflow)
- Airflow Github: https://github.com/apache/incubator-airflow
- Common Pitfalls: https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls
- 🙌 ETL best practices - https://gtoonstra.github.io/etl-with-airflow/
- Astronomer.io Best Practices: https://www.astronomer.io/guides/dag-best-practices/
- airflow-gcp-examples : https://github.com/alexvanboxel/airflow-gcp-examples
- http://michal.karzynski.pl/blog/2017/03/19/developing-workflows-with-apache-airflow/
- https://medium.com/@nehiljain/lessons-learnt-while-airflow-ing-32d3b7fc3fbf
- https://medium.com/snaptravel/airflow-part-2-lessons-learned-793fa3c0841e
- http://soverycode.com/airflow-in-production-a-fictional-example/
- https://www.agari.com/email-security-blog/airflow-agari/
- https://medium.com/bluecore-engineering/were-all-using-airflow-wrong-and-how-to-fix-it-a56f14cb0753
Google Composer is based on airflow. All operators, etc, they re-contribute back to airflow.
The only issue is there is no Repo or tag or anything that allows you to pull down Composer’s version of airflow. They have some from airflow 1.9, some from 1.10 some from the master branch. Composer is currently in 1.9 but has released 1.10 for beta. We are using the 1.9 right now.
Because of this, use their ‘base’ Docker image for testing DAGs. We build off of that and run tests.
- Validity and Integrity of the dag
- Non Cyclic
- Workflow is going where we expect (task to task, etc.)
- We can query properties for the Dag and for each Operator (task) to test/verify them
- https://blog.usejournal.com/testing-in-airflow-part-1-dag-validation-tests-dag-definition-tests-and-unit-tests-2aa94970570c
- https://medium.com/wbaa/datas-inferno-7-circles-of-data-testing-hell-with-airflow-cef4adff58d8
These instructions will show you how to get a development environment up and running to start developing Java Dataflow jobs. By the end you’ll be able to run a Dataflow job locally in debug mode, execute code in a REPL to speed your development cycles, and submit your job to Google Cloud Dataflow.
export PROJECT=$(gcloud config get-value core/project)
gsutil mb -c regional -l us-central1 gs://$PROJECT export BUCKET=$PROJECT
bq mk DataflowJavaSetup bq mk java_quickstart
git clone https://github.com/GoogleCloudPlatform/DataflowTemplates
mvn clean && mvn compile
Add Application working directory drop down select %MAVEN_REPOSITORY% Choose class Add arguments
--project=sandbox
--stagingLocation=gs://sandbox/staging
--tempLocation=gs://sandbox/temp
--templateLocation=gs://sandbox/templates/GcsToBigQueryTemplate.json
--runner=TestDataflowRunner
--javascriptTextTransformGcsPath=gs://sandbox/resources/UDF/sample_UDF.js
--JSONPath=gs://sandbox/resources/schemas/sample_schema.json
--javascriptTextTransformFunctionName=transform
--inputFilePattern=gs://sandbox/data/data.txt
--outputTable=sandbox:java_quickstart.colorful_coffee_people
--bigQueryLoadingTemporaryDirectory=gs://sandbox/bq_load_temp/
line 93 of TextIOToBigQuery.java
Right Click > “Debug TextIOToBigQuery”
Right Click and choose “Evaluate expression”
sample_schema.json:
{
“BigQuery Schema”: [
{
“name”: “location”,
“type”: “STRING”
},
{
“name”: “name”,
“type”: “STRING”
},
{
“name”: “age”,
“type”: “STRING”
},
{
“name”: “color”,
“type”: “STRING”
},
{
“name”: “coffee”,
“type”: “STRING”
}
]
}
sample_UDF.js:
function transform(line) {
var values = line.split(‘,’);
var obj = new Object();
obj.location = values[0];
obj.name = values[1];
obj.age = values[2];
obj.color = values[3];
obj.coffee = values[4];
var jsonString = JSON.stringify(obj);
return jsonString;
}
data.txt:
US,joe,18,green,late
CAN,jan,33,red,cortado
MEX,jonah,56,yellow,cappuccino
gsutil cp ./sample_UDF.js gs://$BUCKET/resources/UDF/
gsutil cp ./sample_schema.json gs://$BUCKET/resources/schemas/
gsutil cp ./data.txt gs://$BUCKET/data/
mvn compile exec:java -Dexec.mainClass=com.google.cloud.teleport.templates.TextIOToBigQuery -Dexec.cleanupDaemonThreads=false -Dexec.args=” \
--project=$PROJECT \
--stagingLocation=gs://$BUCKET/staging \
--tempLocation=gs://$BUCKET/temp \
--templateLocation=gs://$BUCKET/templates/GcsToBigQueryTemplate.json \
--runner=DataflowRunner”
gcloud dataflow jobs run colorful-coffee-people-gcs-test-to-big-query \
--gcs-location=gs://$BUCKET/templates/GcsToBigQueryTemplate.json \
--zone=us-central1-f \
--parameters=javascriptTextTransformGcsPath=gs://$BUCKET/resources/UDF/sample_UDF.js,JSONPath=gs://$BUCKET/resources/schemas/sample_schema.json,javascriptTextTransformFunctionName=transform,inputFilePattern=gs://$BUCKET/data/data.txt,outputTable=$PROJECT:java_quickstart.colorful_coffee_people,bigQueryLoadingTemporaryDirectory=gs://$BUCKET/bq_load_temp/