ETL BigQuery with Airflow


  1. Airflow v2.2.1
  2. Python v3.8.10
  3. Other Packages
  4. Ubuntu
  5. Google SDK
  6. Google BigQuery


  1. Dataset : Use the following credentials to access the database:

    • hostname:
    • port: 3306
    • username: guest
    • password: relational
    • database: financial
    • DB Type: MySQL
  2. Bigquery public dataset by using ga4_obfuscated_sample_ecommerce. You can access dataset by go to, then search for ga4_obfuscated_sample_ecommerce. (*



  1. Airflow Variables

    • Go access your airflow UI (http://localhost:8080), then go to Variables in Admin menu


    • Add a new record(variable) by clicking the plus (+) icon. Another way is just import variables by creating a json file that contain key value of variables. Like this:

      "PROJECT_ID": "",
      "BUCKET_NAME": "",
      "GCS_TEMP_LOCATION": "",
      "GCS_STG_LOCATION": "",
      "DATASET_ID": ""
      • PROJECT_ID : your Google Cloud Platfrom project id
      • BUCKET_NAME : your GCS bucket name
      • GCS_TEMP_LOCATION: your temp location (gs://{yourbucket}/temp)
      • GCS_STG_LOCATION: your staging data location (gs://{yourbucket}/stag)
    • You can access your variable from your DAG. Example :

    from airflow.models.variable import Variable
    PROJECT_ID = Variable.get("PROJECT_ID")
    • Example :


  2. Setup the Airflow Connection

    • Run this command :

      pip install apache-airflow-providers-google


    • Go access your airflow UI (http://localhost:8080), then go to Connections in Admin menu


    • Add or Edit current Connection. Search for Google Cloud conn type, then fill some required fields:

      • Conn Id (set to: google_cloud_default)
      • Conn Type: google Cloud
      • Description
      • Keyfile Path. (locate this path with your keyfile full path)
      • Keyfile JSON. (if you use the keyfile path leave this blank, otherwise fill this with your google service account key and leave the keyfile path blank)
      • Number of Retries
      • Project Id
      • Click Save button


  3. Install other packages

    pip install apache-beam[gcp]
    pip install beam-sql-connector==1.8.5
    pip install apache-airflow-providers-apache-beam


  1. Run airflow webserver --port 8080 -D in your terminal
  2. Run airflow scheduler in your terminal
  3. Go access your airflow UI (

Task 1

  1. Run dag and schedule it to daily at 08.00 PM UTC, specify the DAG tags with "financial","loan","relational_fit"


  2. Dag Airflow that I created use BeamRunPythonPipelineOperator to trigger ApacheBeam script( with additional setup file(, then it will run Dataflow Jobs with worker thas has installed packages such as (beam-sql-connector) from file (*see requirement packages inside file)


  3. After you succed running dataflowjobs, you can see your data has already stored to BigQuery


Task 2

  1. Use ga4_obfuscated_sample_ecommerce BigQuery public dataset

  2. Create a table(*I create a table by using query that stored in, the columns are:

    • event_date
    • event_date_partition (Format date in "event_date" to be YYYY-MM-DD)
    • event_name
    • event_params.key : page_title & page_location (To extract the data, you need to use command like 'UNNEST (event_params)')
    • user_pseudo_id
    • device.category as device_category
    • device.mobile_brand_name as device_mobile_brand_name
    • user_first_touch_timestamp
    • user_first_touch_at (Convert data inside "user_first_touch_timestamp" to be YYYY-MM-DD HH:mm:ss)
    • user_ltv (Filter the revenue properties to greater than zero (> 0))
    • geo (Filter the continent properties to match "Asia" only)
    • stream_id
    • traffic_source (Filter the medium properties to "organic" only) (*
  3. Use event_date_partition as the partition field

  4. Set dataset location to "US", then use standard SQL

  5. Run dag


  6. Data stored in BigQuery
