GoogleCloudDataproc/spark-bigquery-connector

When writing to a BQ table with Integer-range partitioning it fails with complain about time partitioning

xtrmstep opened this issue · 4 comments

Hi,

I was testing the possibility to store a data frame to a BigQuery table with index-ranged partitioning. The following error is thrown when I'm trying to save data frame to a BigQuery:

The field specified for time partitioning can only be of type TIMESTAMP, DATE or DATETIME. The type found is: INTEGER.

Code

The code looks as follows:

df_data_to_save = df_raw_data.select(
    "Id",
    "Status",
    "CreationDate",
    "UpdateDate",
    "Deposit",
    "Gain"
) \
.withColumn("partition_index", F.col("Id") % F.lit(4000)) \
.limit(100)

df_data_to_save.write.format('bigquery') \
	.option("temporaryGcsBucket", "temp_bq_save_bucket") \
	.option("intermediateFormat", "orc") \
	.option("createDisposition", "CREATE_IF_NEEDED") \
	.option("partitionField", "partition_index") \
	.option("partitionRangeStart", 0) \
	.option("partitionRangeEnd", 4000) \
	.option("partitionRangeInterval", 10) \
	.option("clusteredFields", "Id") \
	.option("allowFieldAddition", True) \
	.mode("append") \
	.save("test_dataset.test_table")

The modulo is calculated because field Id can have such values as follows: 502123630784905216, 502052732782477312, etc.

Cluster configuration

spark-bigquery-connector-version = 0.31.1
PIP_PACKAGES=psycopg2 statsd quinn google-cloud-bigquery google-cloud-storage google-cloud-logging pyfarmhash tenacity

Error

The following error is produced:

23/10/30 19:08:00 ERROR com.google.cloud.bigquery.connector.common.BigQueryClient: Unable to create the job to load to my-gcp-project-1.test_dataset.test_table
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In[11], line 1
----> 1 df_data_to_save.write.format('bigquery') \
      2         .option("temporaryGcsBucket", "temp_bq_save_bucket") \
      3         .option("intermediateFormat", "orc") \
      4         .option("createDisposition", "CREATE_IF_NEEDED") \
      5         .option("partitionField", "partition_index") \
      6         .option("partitionRangeStart", 0) \
      7         .option("partitionRangeEnd", 4000) \
      8         .option("partitionRangeInterval", 10) \
      9         .option("clusteredFields", "Id") \
     10         .option("allowFieldAddition", True) \
     11         .mode("append") \
     12         .save("test_dataset.test_table")

File /usr/lib/spark/python/pyspark/sql/readwriter.py:1109, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
   1107     self._jwrite.save()
   1108 else:
-> 1109     self._jwrite.save(path)

File /usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py:1304, in JavaMember.__call__(self, *args)
   1298 command = proto.CALL_COMMAND_NAME +\
   1299     self.command_header +\
   1300     args_command +\
   1301     proto.END_COMMAND_PART
   1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
   1305     answer, self.gateway_client, self.target_id, self.name)
   1307 for temp_arg in temp_args:
   1308     temp_arg._detach()

File /usr/lib/spark/python/pyspark/sql/utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
    109 def deco(*a, **kw):
    110     try:
--> 111         return f(*a, **kw)
    112     except py4j.protocol.Py4JJavaError as e:
    113         converted = convert_exception(e.java_exception)

File /usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o136.save.
: com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Failed to write to BigQuery
	at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:111)
	at com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.insert(BigQueryDeprecatedIndirectInsertableRelation.java:43)
	at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:51)
	at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:107)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132)
	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: The field specified for time partitioning can only be of type TIMESTAMP, DATE or DATETIME. The type found is: INTEGER.
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:115)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.create(HttpBigQueryRpc.java:220)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$5.call(BigQueryImpl.java:405)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$5.call(BigQueryImpl.java:394)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.run(BigQueryRetryHelper.java:86)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries(BigQueryRetryHelper.java:49)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.create(BigQueryImpl.java:393)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.create(BigQueryImpl.java:358)
	at com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:353)
	at com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:348)
	at com.google.cloud.bigquery.connector.common.BigQueryClient.loadDataIntoTable(BigQueryClient.java:613)
	at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.java:139)
	at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:108)
	... 35 more
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
POST https://www.googleapis.com/bigquery/v2/projects/my-gcp-project-1/jobs?prettyPrint=false
{
  "code": 400,
  "errors": [
    {
      "domain": "global",
      "message": "The field specified for time partitioning can only be of type TIMESTAMP, DATE or DATETIME. The type found is: INTEGER.",
      "reason": "invalid"
    }
  ],
  "message": "The field specified for time partitioning can only be of type TIMESTAMP, DATE or DATETIME. The type found is: INTEGER.",
  "status": "INVALID_ARGUMENT"
}
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$3.interceptResponse(AbstractGoogleClientRequest.java:466)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:552)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:493)
	at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:603)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.create(HttpBigQueryRpc.java:218)
	... 47 more
isha97 commented

Hi @xtrmstep

Is the table test_dataset.test_table an existing table? If so, can you please provide the schema?

Closing this issue as there is no response back from cx. @xtrmstep Please reopen the issue if you are still facing this.

Hi @isha97 I think I've replied, but don't see the reply now. I'm sorry, maybe it was some glitch on my side. Please, find a sample below. There are few things need to be added there: bucket name and BQ table name. The table doesn't not exists in this case. I haven't checked what would happen if I try to save into existing one.
Here is the code:

from random import random
import pyspark.sql.types as T
import pyspark.sql.functions as F

TEMP_GCS_BUCKET_NAME = "BUCKET"
BQ_TABLE_NAME = "DATASET.TABLE"

ages = ["young", "middle", "old"]
sizes = ["small", "middle", "large"]
colors = ["red", "blue", "white"]

def random_value(values):
    return values[int(len(values) * random())]

data = [( 1,  "star1", random_value(sizes), random_value(ages), 4000.0, random_value(colors)),
        ( 2,  "star2", random_value(sizes), random_value(ages), 6000.0, random_value(colors)),
        ( 3,  "star3", random_value(sizes), random_value(ages), 3000.0, random_value(colors)),
        ( 4,  "star4", random_value(sizes), random_value(ages), 8000.0, random_value(colors)),
        ( 5,  "star5", random_value(sizes), random_value(ages), 5000.0, random_value(colors)),
        ( 6,  "star6", random_value(sizes), random_value(ages), 3500.0, random_value(colors)),
        ( 7,  "star7", random_value(sizes), random_value(ages), 4500.0, random_value(colors)),
        ( 8,  "star8", random_value(sizes), random_value(ages), 7000.0, random_value(colors)),
        ( 9,  "star9", random_value(sizes), random_value(ages), 5500.0, random_value(colors)),
        (10, "star10", random_value(sizes), random_value(ages), 6500.0, random_value(colors)),
        (11, "star11", random_value(sizes), random_value(ages), 7500.0, random_value(colors)),
        (12, "star12", random_value(sizes), random_value(ages), 3200.0, random_value(colors)),
        (13, "star13", random_value(sizes), random_value(ages), 4700.0, random_value(colors)),
        (14, "star14", random_value(sizes), random_value(ages), 3700.0, random_value(colors)),
        (15, "star15", random_value(sizes), random_value(ages), 6200.0, random_value(colors)),
        (16, "star16", random_value(sizes), random_value(ages), 5800.0, random_value(colors)),
        (17, "star17", random_value(sizes), random_value(ages), 5200.0, random_value(colors)),
        (18, "star18", random_value(sizes), random_value(ages), 5100.0, random_value(colors)),
        (19, "star19", random_value(sizes), random_value(ages), 5400.0, random_value(colors)),
        (20, "star20", random_value(sizes), random_value(ages), 7200.0, random_value(colors))]

# define the schema for the DataFrame
schema = T.StructType([
    T.StructField("id", T.IntegerType(), True),
    T.StructField("name", T.StringType(), True),
    T.StructField("size", T.StringType(), True),
    T.StructField("age", T.StringType(), True),
    T.StructField("temp", T.DoubleType(), True),
    T.StructField("color", T.StringType(), True)
])

# create a DataFrame from the data and schema
df = spark.createDataFrame(data, schema)

df.show()

df_data_to_save = df \
.withColumn("partition_index", F.col("id") % F.lit(4000))

df_data_to_save.show()

df_data_to_save.write.format('bigquery') \
    .option("temporaryGcsBucket", TEMP_GCS_BUCKET_NAME) \
    .option("intermediateFormat", "orc") \
    .option("createDisposition", "CREATE_IF_NEEDED") \
    .option("partitionField", "partition_index") \
    .option("partitionRangeStart", 0) \
    .option("partitionRangeEnd", 4000) \
    .option("partitionRangeInterval", 10) \
    .option("clusteredFields", "Id") \
    .option("allowFieldAddition", True) \
    .mode("append") \
    .save(BQ_TABLE_NAME)

When you try to execute, it will fail with the error described above. I can provide more details, but not in public access.

Regards,

I cannot reopen this issue. Could you, please, help with this? @isha97