This document is designed as a tutorial of copy-paste-able commands to see how to use streaming data in Google Cloud Platform's Data and Analytics tools. Here's an overview of the application we'll be showcasing in this tutorial:
As you can see, we'll be using a variety of GCP services:
- BigQuery - some of the data come from here, and are replayed into...
- PubSub - we'll be showing how to consume replayed historical financial data, as well as live cryptocurrency data
- Dataflow - we're using dataflow to perform a simple aggregation of the pubsub stream into fixed-size time windows so that they can be visualized as candlesticks, a common type of data visualization for financial data.
- Firestore - we're putting data into Firestore for web visualization with D3.js
Here's a table of contents for the tutorial. Clicking any item will link down to that section later in this doc:
Clone this repository in Cloud Shell:
git clone https://github.com/allenday/streaming-fsi-showcase
cd streaming-fsi-showcase
export $REPO=PWD
Go to APIs page. Search for PubSub and click - "Enable"
Go to APIs page. Search for Dataflow and click - "Enable"
Go to Firestore Page. Select Firestore in Native mode.
Go to Firestore console
- add your project to Firebase Console
- add new application named "charts"
- under
database > rules
set up permissions on the Fireestore database
rules_version = '2';
service cloud.firestore {
match /databases/{database}/documents {
match /polygon_trades/{documentId} {
allow read;
}
match /ethereum_transactions/{documentId} {
allow read;
}
match /{document=**} {
allow read, write: if false;
}
}
}
- Modify
charts/csmain.js
by putting your Firebase app config inside - Create a public bucket that will host the application, and upload the files:
export PUBLIC_BUCKET_NAME=${PROJECT}_public
gsutil mb gs://$PUBLIC_BUCKET_NAME
gsutil iam ch allUsers:objectViewer gs://$PUBLIC_BUCKET_NAME
- Upload files from the
charts
directory to the newly created public bucket:gsutil -m cp $REPO/charts/* gs://$PUBLIC_BUCKET_NAME
There won't be anything to see... yet. We'll come back to view these webpages later, after the data begins to flow and made available to them for visualization.
Copy some stock trades historical data into a temp table for use in this tutorial:
export PROJECT=$(gcloud config get-value project 2> /dev/null)
bq mk polygon
bq cp ethereum-streaming-dev:polygon.trades $PROJECT:polygon.trades
- Clone this repository (
streaming-fsi-showcase
) into the Cloud AI notebooks environment. Use thehttps
URL, i.e.https://github.com/allenday/streaming-fsi-showcase.git
- Run
jupyter/_jupyter-extensions.ipynb
notebook to install extensions (takes ~10 minutes) - Reboot jupyter notebook GCE instance
- Run the
jupyter/bq.ipynb
notebook
Create a PubSub topic. We'll be publishing data to this topic from a GCE instance that retrieves the historical from BigQuery and replays it as if it's live data.
gcloud pubsub topics create polygon.trades --project=$PROJECT
Create temp resources and start a GCE instance running a Docker container based on the blockchain-etl/bigquery-to-pubsub repo.
Note that in this step we define a variable $REPLAY_RATE
to replay data at 10x speed. It makes this demo more dynamic and more interesting than watching paint dry.
# we'll replay the trade data faster than real-time to make for a more dynamic demo
$REPLAY_RATE=0.1
cd $REPO/replay
# we'll create a temporary GCS bucket with this name:
export TEMP_RESOURCE_NAME=$(./get_temp_resource_name.sh)
bash ./create_temp_resources.sh
gcloud compute instances create-with-container replay-tool \
--zone=us-central1-a \
--machine-type=n1-standard-1 \
--scopes=https://www.googleapis.com/auth/bigquery,https://www.googleapis.com/auth/pubsub,https://www.googleapis.com/auth/servicecontrol,https://www.googleapis.com/auth/service.management.readonly,https://www.googleapis.com/auth/logging.write,https://www.googleapis.com/auth/monitoring.write,https://www.googleapis.com/auth/trace.append,https://www.googleapis.com/auth/devstorage.read_write \
--container-image=blockchainetl/bigquery-to-pubsub:0.0.1 \
--container-restart-policy=always \
--container-arg=--bigquery-table \
--container-arg=$PROJECT.polygon.trades \
--container-arg=--timestamp-field \
--container-arg=ts \
--container-arg=--start-timestamp \
--container-arg=2018-12-31T18:00:00 \
--container-arg=--end-timestamp \
--container-arg=2019-01-01T00:00:00 \
--container-arg=--batch-size-in-seconds \
--container-arg=1800 \
--container-arg=--replay-rate \
--container-arg=$REPLAY_RATE \
--container-arg=--pubsub-topic \
--container-arg=projects/$PROJECT/topics/polygon.trades \
--container-arg=--temp-bigquery-dataset \
--container-arg=$TEMP_RESOURCE_NAME \
--container-arg=--temp-bucket \
--container-arg=$TEMP_RESOURCE_NAME
Before moving on, go to the PubSub page of Cloud Console and perform a sanity check. Create a test subscription to make sure data are being published to PubSub from the replay tool.
Create a subscription:
gcloud pubsub subscriptions create polygon.trades --topic=polygon.trades --ack-deadline=60
and start a dataflow pipeline:
cd $REPO/dataflow
mvn clean package
java -cp target/ethereum-streaming-analytics-bundled-1.0-SNAPSHOT.jar com.google.allenday.TransactionMetricsPipeline \
--runner=org.apache.beam.runners.dataflow.DataflowRunner \
--project=$PROJECT \
--inputDataTopicOrSubscription=projects/$PROJECT/topics/polygon.trades \
--firestoreCollection=polygon_trades \
--streaming=true \
--jobName=polygon-candlestick-demo \
--inputType=polygon
As a sanity check, go to the Dataflow page in Cloud Console to confirm that the dataflow job was created and that it is successfully retrieving data from PubSub.
Check that the real-time chart is receiving data. It's here:
echo https://storage.googleapis.com/$PUBLIC_BUCKET_NAME/trade.html
gcloud pubsub subscriptions create crypto_ethereum.transactions \
--topic=crypto_ethereum.transactions \
--topic-project=crypto-public-data \
--ack-deadline=60
Run the jupyter/pub-sub.ipynb
notebook to inspect data in PubSub
cd $REPO/dataflow
java -cp target/ethereum-streaming-analytics-bundled-1.0-SNAPSHOT.jar com.google.allenday.TransactionMetricsPipeline \
--runner=org.apache.beam.runners.dataflow.DataflowRunner \
--project=$PROJECT \
--inputDataTopicOrSubscription=projects/$PROJECT/subscriptions/crypto_ethereum.transactions \
--firestoreCollection=ethereum_transactions \
--streaming=true \
--jobName=ethereum-candlestick-demo \
--inputType=ethereum
Check that the real-time chart is receiving live data. It's here: echo https://storage.googleapis.com/$PUBLIC_BUCKET_NAME/ethereum.html