/hedera-etl

ETL scripts for Hedera Hashgraph

Primary LanguageJavaApache License 2.0Apache-2.0

Hedera ETL

Build Status Discord

Hedera ETL populates BigQuery dataset with transactions and records generated by the Hedera Mainnet (or Testnet, if so configured).

  • Extract: Stream of transactions (and records) are ingested from a GCP PubSub topic
  • Transform: Filters for important fields, formats data types, etc
  • Load: Streaming insert into BigQuery dataset

Overview

Ingestion

  • PubSub topic contains JSON serialized hedera transactions published by Hedera Mirror Node. More details can be found here.

  • Apache Beam pipeline pulls transactions from PubSub and inserts them into BigQuery. GCP Dataflow is used as runner for the pipeline.

  • Deduplication: The above ingestion pipeline gives at-least-once guarantee for persisting transaction into BigQuery. Duplicates, if inserted, are removed using a deduplication task.

Setup

BigQuery

Schema for BigQuery table to store Hedera transactions is in transactions-schema.json file. Please refer corresponding fields' documentation in hedera-protobuf for more info about columns.

Creating tables

bq CLI is needed to create the tables. /create-tables.sh can be used to create all the tables together. Alternatively, tables can be created individually using the commands below.

Transactions table
bq mk \
  --table \
  --description "Hedera network transactions" \
  --time_partitioning_field consensusTimestampTruncated \
  --time_partitioning_type DAY \
  --clustering_fields transactionType \
  project_id:dataset.transactions \
  hedera-etl-bigquery/src/main/resources/transactions-schema.json
Errors table

If an error is encountered when inserting a transaction into BigQuery, then the insert is retried. However, errors for which retry would not help (for example, table row violating the schema), are not tried again and instead logged into errors table.

bq mk \
  --table \
  --description "Hedera ETL Errors" \
  project_id:dataset.errors \
  hedera-etl-bigquery/src/main/resources/errors-schema.json
Deduplication state table

Deduplication task's state is stored in BigQuery table for persistence. That's because the task already relies on BigQuery to be available, and adding dependency on a persistent volume or another database would be not as good.

bq mk \
  --table \
  --description "BigQuery deduplication task state" \
  --description "Hedera Dedupe " \
  project_id:dataset.dedupe_state \
  hedera-etl-bigquery/src/main/resources/state-schema.json

ETL to BigQuery

Requirements
  1. BigQuery tables for transactions and errors should exist
  2. PubSub topic should exist

For requirements to deploy on GCP Dataflow, refer deployment.

Common parameters

Configure GCP project id, PubSub subscription/topic, and BigQuery tables.

PROJECT_ID=... # Set your project id
SUBSCRIPTION=projects/${PROJECT_ID}/subscriptions/subscriptionName
TRANSACTIONS_TABLE=${PROJECT_ID}:dataset.transactions
ERRORS_TABLE=${PROJECT_ID}:dataset.errors

Running locally

cd hedera-etl-bigquery

mvn compile exec:java -PdirectRunner -Dexec.args=" \
  --inputSubscription=${SUBSCRIPTION}, \
  --outputTransactionsTable=${TRANSACTIONS_TABLE}, \
  --outputErrorsTable=${ERRORS_TABLE}"

Running on GCP Dataflow

  1. Setup GCS bucket which is used for staging, templates, and temp location.
BUCKET_NAME=... # Set your bucket name
PIPELINE_FOLDER=gs://${BUCKET_NAME}/etl-bigquery
  1. Build and upload template to GCS bucket
cd hedera-etl-bigquery

mvn compile exec:java \
 -Dexec.args=" \
 --project=${PROJECT_ID} \
 --stagingLocation=${PIPELINE_FOLDER}/staging \
 --tempLocation=${PIPELINE_FOLDER}/temp \
 --templateLocation=${PIPELINE_FOLDER}/template \
 --runner=DataflowRunner"
  1. Start Dataflow job using the template
gcloud dataflow jobs run etl-bigquery-`date +"%Y%m%d-%H%M%S%z"` \
 --gcs-location=${PIPELINE_FOLDER}/template \
 --parameters "inputSubscription=${SUBSCRIPTION},outputTransactionsTable=${TRANSACTIONS_TABLE},outputErrorsTable=${ERRORS_TABLE}"

Controller service account can be configured by adding --service-account-email=my-service-account-name@<project-id>.iam.gserviceaccount.com. See Controller service account for more details.

Deduplication

Deduplication task trails the transactions table to ensure that two rows never have same consensusTimestamp. Due to at-least once guarantee of PubSub and Hedera Mirror Node (publishing to pubsub), it's possible that in rare cases, single transaction gets inserted more than once. Deduplication task removes these duplicates to ensure exactly-once guarantee. See class comments on DedupeRunner for more details.

Development

Build: mvn clean compile -DskipTests

Running tests: mvn test

Running deduplication tests

Due to lack of an emulator for BigQuery, deduplication tests requires GCP BigQuery. To run the tests, you'll need a GCP project with BigQuery API enabled and json key of a service account with BigQuery Editor role.

Setup application.yml as follows:

hedera:
  dedupe:
    projectId: projectName
    credentialsLocation: file:/path/to/key.json
    transactionsSchemaLocation: /path/to/hedera-etl/hedera-etl-bigquery/src/main/resources/transactions-schema.json
    stateSchemaLocation: /path/to/hedera-etl/hedera-etl-bigquery/src/main/resources/state-schema.json

Use following command to run deduplication tests

mvn test -PgcpBigquery -Dspring.config.additional-location=file:/path/to/dir/with/yaml/file/

Note that it assumes current directory to be project's root. If that is not the case, change the schema location values appropriately.

More documentation

Deployment
Configurations

Code of Conduct

This project is governed by the Contributor Covenant Code of Conduct. By participating, you are expected to uphold this code of conduct. Please report unacceptable behavior to oss@hedera.com.

License

Apache License 2.0