Astra-cdc-to-BigQuery-sink

This is a step by step guide to enable change data capture (CDC) on a table in Astra DB and then create a BigQuery sink in Astra Streaming to feed data from the table into Google BigQuery. This provides the ability to send data changes made in Astra DB to BigQuery, in near real-time, for downstream analytical use cases.

Prerequisites

  • Create a free Astra account
  • Create an Astra database following these instructions
    • Choose a region that is supports CDC for Astra DB by visiting the Streaming documentation's "Regions" page
    • For our example we will be using the following database details:
      • Database Name: sandbox
      • Region: us-east1 in GCP
      • Keyspace Name: sample
      • Provider and Region: Google Cloud > North America > us-east1
  • Access to a GCP project with BQ admin and project viewer permissions
    • And a GCP Service Account with BQ admin permissions - needed to authenticate using API keys
    • Create a JSON key for the service account.

Create streaming tenant

  1. Login to your Astra account

    image

  2. From the Astra home page, choose "Create a Streaming Tenant" from the Quick Access section and create a new tenant with the following details:

    Tenant Name: cdctest
    Provider and Region: Google Cloud > useast1

    image

    image

    The new tenant will be ready very quickly and your view will automatically refresh to its “Quickstart” tab. CDC will automatically create a namespace and topic within the tenant.


Create table and enable CDC

  1. Navigate to the sandbox database and click on the CQL Console tab.

    image

  2. Create a table that will hold the test account information

    create table sample.all_accounts (id uuid primary key, full_name text, email text);
    

    image

  3. Navigate to the database’s "CDC" tab and choose "Enable CDC"

    image

  4. Enable CDC with the following details:

    Tenant: pulsar-gcp-useast1 / cdctest

    Keyspace: sample

    Table name: all_accounts

    image

  5. Wait for the new CDC process to have a status of "Running"

    image

  6. Add a new record to the all_accounts table, to initialize the streaming objects:

    insert into sample.all_accounts (id, full_name, email) values (85540e16-aca8-11ec-b909-0242ac120002, 'Joe Smith', 'joesmith@domain.com');
    

Create BigQuery streaming sink

As of this posting, the BigQuery streaming sink is in an "experimental" state, meaning it hasn't been fully tested or certified. Currently, creating the sink using the UI does not result in a functioning sink - so the sink needs to be created with the pulsar-admin CLI.

Reference for utilizing the pulsar-admin CLI with Astra is found in this Astra Streaming Demo repo.

Reference for the Pulsar BigQuery sink connector, which is used by Astra, is available in this repo.

  1. Create a dataset in BigQuery with a name of your choosing (for this example the dataset is named astracdc_demo)
    image

  2. Create a Topic in the astracdc namespace of the streaming tenant for the offset storage:

    • From the Streaming tenant navigate to: "Namespace and Topics"
    • Under the astracdc namespace click "Add Topic". image
    • Give the topic a name and click "Add Topic". For this example, naming the topic bq-demo-offset-01 image
  3. Under the astracdc namespace, copy the data topic name for the sample.all_accounts table (will be in the format data-[DB ID]-sample.all_accounts) for use later.

  4. Create a BigQuery sink config file

    • Refer to sample bqdemoconfig.yaml
      • Note: the keyfile in the example is redacted but is the JSON key downloaded for the GCP service account. All quotes " and backslashes \ in the json key file need to be escaped when adding to the config yaml using the \ escape character, i.e. \" and \\
    $ cat /tmp/bqdemoconfig.yaml 
    configs:
      lingerTimeMs: "1000"
      topic: "cdctest/astracdc/data-342690fc-0ec9-4025-8c4e-d0966f71ecd1-sample.all_accounts"
      sanitizeTopicName: "true"
      kafkaConnectorConfigProperties:
    		autoCreateTables: "true"
    		bufferSize: "10"
    		connector.class: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
    		defaultDataset: "astracdc_demo"
    		kafkaDataFieldName: "topicMetaData"
    		keySource: "JSON"
    		keyfile: "{\"type\": \"service_account\",\"project_id\": \"bq-test-382615\",\"private_key_id\": \"**redacted**\",\"private_key\": \"-----BEGIN PRIVATE KEY-----\\n**redacted**\\n-----END PRIVATE KEY-----\\n\",\"client_email\": \"**redacted\",\"client_id\": \"**redacted**\",\"auth_uri\": \"https://accounts.google.com/o/oauth2/auth\",\"token_uri\": \"https://oauth2.googleapis.com/token\",\"auth_provider_x509_cert_url\": \"https://www.googleapis.com/oauth2/v1/certs\",\"client_x509_cert_url\": \"https://www.googleapis.com/robot/v1/metadata/x509/redacted.iam.gserviceaccount.com\"}"
    		name: "bq-demo"
    		project: "bq-test-382615"
    		queueSize: "100"
    		sanitizeFieldNames: "true"
    		sanitizeTopics: "false"
    		tasks.max: "1"
    		topics: "cdctest/astracdc/data-342690fc-0ec9-4025-8c4e-d0966f71ecd1-sample.all_accounts"
    		kafkaKeyFieldName: "key"
    		topic2TableMap: "persistent___cdctest_astracdc_data_342690fc_0ec9_4025_8c4e_d0966f71ecd1_sample_all_accounts_partition_0:all_accounts_partition_0,persistent___cdctest_astracdc_data_342690fc_0ec9_4025_8c4e_d0966f71ecd1_sample_all_accounts_partition_1:all_accounts_partition_1,persistent___cdctest_astracdc_data_342690fc_0ec9_4025_8c4e_d0966f71ecd1_sample_all_accounts_partition_2:all_accounts_partition_2"
    		allowNewBigQueryFields: "true"
      batchSize: "1000"
      offsetStorageTopic: "cdctest/astracdc/bq-demo-offset-01"
      kafkaConnectorSinkClass: "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector"
    
    • Config Properties of note:
      • sanitizeTopicName: "true" - required value
      • defaultDataset: - specifies the dataset to use in BQ - must already exist
      • kafkaDataFieldName: "topicMetaData" - required value
      • name: - name of the sink
      • project: - your GCP BigQuery project id
      • sanitizeFieldNames: "true" - required value
      • sanitizeTopics: "false" - required value
      • topics: - CDC data topic from Astra Streaming for appropriate table(s) (copied in step 3 above)
      • kafkaKeyFieldName: - use to sink the CDC table key field into BiqQuery
      • topic2TableMap: - use to rename BQ tables instead of using topic name - one table per partition is created - follow syntax in the sample config
  5. Create the BigQuery sink using the pulsar-admin CLI - replace the topic name in --inputs with the topic name from step 2 above.

    pulsar-admin sinks create -t bigquery --processing-guarantees EFFECTIVELY_ONCE --inputs cdctest/astracdc/data-[DB ID]-sample.all_accounts --sink-config-file /tmp/bqdemoconfig.yaml --tenant cdctest --namespace astracdc --name bq-demo --timeout-ms 0
    
  6. Verify the sink exists and is running in the Astra Console.

    • Navigate to the streaming tenant in the console and click on sinks

    • Sink should initially show an "Initializing" status and then turn to a "Running" status image image

    • Troubleshooting tip: If the sink Errors instead of moving to a Running state - click into the sink from the console and view/download the log and look for errors to triage the issue. Correct the the config issue, delete the sink and recreate as needed. image

Test the CDC to BigQuery sink

  1. Insert a new record into the all_accounts table and verify it flows to BigQuery:

    insert into sample.all_accounts (id, full_name, email) values (uuid(), 'Champ Ship', 'champ.ship@acoolplace.com');
    
  2. Confirm in BigQuery that the table was created (if not already existing) and that the record was inserted. image

    • Note:
      • One BigQuery table per partition is created. Per functionality as of this writing, this cannot be changed.
      • The table name can be controlled by mapping the full partition name to a table name of your choosing, using the topic2TableMap: config property. See the sample bqdemoconfig.yaml for an example.