GoogleCloudDataproc/spark-bigquery-connector

Unable to write changed table to BigQuery with the new Spark Connector - 0.35.1

katerina-kogan opened this issue · 1 comments

Environment:
SPARK_BQ_CONNECTOR_VERSION= 0.35.1
Dataproc Cluster
Spark 3.3
DBT 1.6

After running dbt model, the table in BigQuery gets populated with data, all good. But if table definition gets changes - a field gets added or removed, then dataproc cluster job is throwing an error: " IllegalArgumentException: BigQueryConnectorException$InvalidSchemaException: Destination table's schema is not compatible with dataframe's schema"

Steps to reproduce:
1.

dbt cloud python model in fct_model.py:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

TWO_YEARS_DAYS = 365*2

import pyspark.sql.functions as F
from pyspark.sql.window import Window

TWO_YEARS_DAYS = 365*2

def model(dbt, session):
    session.conf.set("viewsEnabled","true")

    table1_df = dbt.source("dataset1", "table1")
    table2_df = dbt.source("dataset1", "table2")
    table3_df = dbt.source("dataset2", "table3")
    table4_df = dbt.source("dataset2", "table4")
    table5_df = dbt.ref("ref_table")


   return table1_df \
      .join(table2_df, table1_df.o_id==table2_df.oo_id,"inner") \
      .join(table4_df, table4_df.o_id==table2_df.id,"inner") \
      .join(table3_df, table4_df.g_id==table3_df.id,"inner") \
      .join(table5_df, table4_df.status==table5_df.status_code,"left") \
      .filter(table1_df.created >= F.date_sub(F.current_date(), TWO_YEARS_DAYS)) \
      .filter(table4_df.created >= F.date_sub(F.current_date(), TWO_YEARS_DAYS)) \
      .withColumn("attempt_number", F.row_number().over(Window.partitionBy(table1_df.id).orderBy(table2_df.created_at))) \
      .withColumn("payment_type", F.coalesce(F.get_json_object(table4_df["extra"], "$.result.data.payment_type"), table3_df["payment_type"])) \
      .select(
          table2_df["customer_id"], 
          F.to_date(table1_df["created"]).alias("created"),
          F.col("attempt_number"),
          F.col("status_desc").alias("status"),
          F.col("payment_type")
          ).repartition(10)
  1. The model runs on Dataproc Cluster via "dbt built" command in dbt cloud
  2. The table gets populated with data
  3. If table definition gets changed, say, one field gets removed:
return table1_df \
      .join(table2_df, table1_df.o_id==table2_df.oo_id,"inner") \
      .join(table4_df, table4_df.o_id==table2_df.id,"inner") \
      .join(table3_df, table4_df.g_id==table3_df.id,"inner") \
      .join(table5_df, table4_df.status==table5_df.status_code,"left") \
      .filter(table1_df.created >= F.date_sub(F.current_date(), TWO_YEARS_DAYS)) \
      .filter(table4_df.created >= F.date_sub(F.current_date(), TWO_YEARS_DAYS)) \
      .withColumn("attempt_number", F.row_number().over(Window.partitionBy(table1_df.id).orderBy(table2_df.created_at))) \
      .withColumn("payment_type", F.coalesce(F.get_json_object(table4_df["extra"], "$.result.data.payment_type"), table3_df["payment_type"])) \
      .select(
          table2_df["customer_id"], 
          F.to_date(table1_df["created"]).alias("created"),
          F.col("attempt_number"),
          F.col("status_desc").alias("status")
          ).repartition(10)

the error appears. Same error appears if a field gets added.

There is no such problem with SPARK_BQ_CONNECTOR_VERSION= 0.22.0. But we were keen to use the latest version.

Thank you for your help!

Can you please provide the schema of the dataframe that is being written to BQ and the BQ table's schema?
Also, what is the exact command that is executed to write the dataframe to BQ? (E.g. dataframe.write.format("bigquery")... )