GoogleCloudDataproc/spark-bigquery-connector

Disable Schema Changes

Closed this issue · 2 comments

Hi,

I am using the BigQuery connector on a DataProc cluster to create a DataFrame and store it within a BigQuery table. The table is created via Terraform with a hardcoded schema where all fields are set to REQUIRED.

This table is refreshed every day, and the results are overwritten. The code used to write to this table is as follows:

data.write.format("bigquery") \
    .mode("overwrite") \
    .option("table", table_fullname) \
    .option("createDisposition", "CREATE_NEVER") \
    .option("allowFieldAddition", "false") \
    .option("allowFieldRelaxation", "false") \
    .option("temporaryGcsBucket", self._config.get_config("temp_gcs_bucket")) \
    .save()

Unfortunately, this configuration does not work as expected. When writing to the table, the schema is updated, and many fields are switched from REQUIRED to NULLABLE. Since the connector version wasn't explicitly specified, it should be the default version, which is spark-3.5-bigquery-0.39.0.jar.

Any guidance on resolving this issue would be greatly appreciated. Thank you!

The fix will be available in the next release.

Hey @isha97 , good afternoon.

Finally I was able to update our connector and test the new feature. Schema field relaxation works as expected. Thank you for your commitment and effective fix! It was very helpful for my team.

However schema field addition seems not to work. It is not a big deal for us as we wont use it, however I just want to notify this other unexpected behaviour.
I replaced the old jar with gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.40.0.jar, however if I add a new column that is not defined in the table schema, then the schema is updated and the new column added.

Here the code I use to write:

df_writer = (
    data.write.format(source="bigquery")
    .mode(saveMode="overwrite")
    .option(key="table", value=table_fullname)
    .option(key="createDisposition", value="CREATE_NEVER")
    .option("allowFieldAddition", "false")
    .option("allowFieldRelaxation", "false")
    .option(key="temporaryGcsBucket", value=self._config.get_config("temp_gcs_bucket"))
)