When writing to a BQ table with Integer-range partitioning it fails with complain about time partitioning
xtrmstep opened this issue · 4 comments
Hi,
I was testing the possibility to store a data frame to a BigQuery table with index-ranged partitioning. The following error is thrown when I'm trying to save data frame to a BigQuery:
The field specified for time partitioning can only be of type TIMESTAMP, DATE or DATETIME. The type found is: INTEGER.
Code
The code looks as follows:
df_data_to_save = df_raw_data.select(
"Id",
"Status",
"CreationDate",
"UpdateDate",
"Deposit",
"Gain"
) \
.withColumn("partition_index", F.col("Id") % F.lit(4000)) \
.limit(100)
df_data_to_save.write.format('bigquery') \
.option("temporaryGcsBucket", "temp_bq_save_bucket") \
.option("intermediateFormat", "orc") \
.option("createDisposition", "CREATE_IF_NEEDED") \
.option("partitionField", "partition_index") \
.option("partitionRangeStart", 0) \
.option("partitionRangeEnd", 4000) \
.option("partitionRangeInterval", 10) \
.option("clusteredFields", "Id") \
.option("allowFieldAddition", True) \
.mode("append") \
.save("test_dataset.test_table")
The modulo is calculated because field Id
can have such values as follows: 502123630784905216
, 502052732782477312
, etc.
Cluster configuration
spark-bigquery-connector-version = 0.31.1
PIP_PACKAGES=psycopg2 statsd quinn google-cloud-bigquery google-cloud-storage google-cloud-logging pyfarmhash tenacity
Error
The following error is produced:
23/10/30 19:08:00 ERROR com.google.cloud.bigquery.connector.common.BigQueryClient: Unable to create the job to load to my-gcp-project-1.test_dataset.test_table
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
Cell In[11], line 1
----> 1 df_data_to_save.write.format('bigquery') \
2 .option("temporaryGcsBucket", "temp_bq_save_bucket") \
3 .option("intermediateFormat", "orc") \
4 .option("createDisposition", "CREATE_IF_NEEDED") \
5 .option("partitionField", "partition_index") \
6 .option("partitionRangeStart", 0) \
7 .option("partitionRangeEnd", 4000) \
8 .option("partitionRangeInterval", 10) \
9 .option("clusteredFields", "Id") \
10 .option("allowFieldAddition", True) \
11 .mode("append") \
12 .save("test_dataset.test_table")
File /usr/lib/spark/python/pyspark/sql/readwriter.py:1109, in DataFrameWriter.save(self, path, format, mode, partitionBy, **options)
1107 self._jwrite.save()
1108 else:
-> 1109 self._jwrite.save(path)
File /usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py:1304, in JavaMember.__call__(self, *args)
1298 command = proto.CALL_COMMAND_NAME +\
1299 self.command_header +\
1300 args_command +\
1301 proto.END_COMMAND_PART
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1307 for temp_arg in temp_args:
1308 temp_arg._detach()
File /usr/lib/spark/python/pyspark/sql/utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
109 def deco(*a, **kw):
110 try:
--> 111 return f(*a, **kw)
112 except py4j.protocol.Py4JJavaError as e:
113 converted = convert_exception(e.java_exception)
File /usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
332 format(target_id, ".", name, value))
Py4JJavaError: An error occurred while calling o136.save.
: com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Failed to write to BigQuery
at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:111)
at com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.insert(BigQueryDeprecatedIndirectInsertableRelation.java:43)
at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:51)
at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:107)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:750)
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: The field specified for time partitioning can only be of type TIMESTAMP, DATE or DATETIME. The type found is: INTEGER.
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:115)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.create(HttpBigQueryRpc.java:220)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$5.call(BigQueryImpl.java:405)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl$5.call(BigQueryImpl.java:394)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.run(BigQueryRetryHelper.java:86)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries(BigQueryRetryHelper.java:49)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.create(BigQueryImpl.java:393)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryImpl.create(BigQueryImpl.java:358)
at com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:353)
at com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:348)
at com.google.cloud.bigquery.connector.common.BigQueryClient.loadDataIntoTable(BigQueryClient.java:613)
at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.java:139)
at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:108)
... 35 more
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException: 400 Bad Request
POST https://www.googleapis.com/bigquery/v2/projects/my-gcp-project-1/jobs?prettyPrint=false
{
"code": 400,
"errors": [
{
"domain": "global",
"message": "The field specified for time partitioning can only be of type TIMESTAMP, DATE or DATETIME. The type found is: INTEGER.",
"reason": "invalid"
}
],
"message": "The field specified for time partitioning can only be of type TIMESTAMP, DATE or DATETIME. The type found is: INTEGER.",
"status": "INVALID_ARGUMENT"
}
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest$3.interceptResponse(AbstractGoogleClientRequest.java:466)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:552)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:493)
at com.google.cloud.spark.bigquery.repackaged.com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:603)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.create(HttpBigQueryRpc.java:218)
... 47 more
Hi @xtrmstep
Is the table test_dataset.test_table
an existing table? If so, can you please provide the schema?
Closing this issue as there is no response back from cx. @xtrmstep Please reopen the issue if you are still facing this.
Hi @isha97 I think I've replied, but don't see the reply now. I'm sorry, maybe it was some glitch on my side. Please, find a sample below. There are few things need to be added there: bucket name and BQ table name. The table doesn't not exists in this case. I haven't checked what would happen if I try to save into existing one.
Here is the code:
from random import random
import pyspark.sql.types as T
import pyspark.sql.functions as F
TEMP_GCS_BUCKET_NAME = "BUCKET"
BQ_TABLE_NAME = "DATASET.TABLE"
ages = ["young", "middle", "old"]
sizes = ["small", "middle", "large"]
colors = ["red", "blue", "white"]
def random_value(values):
return values[int(len(values) * random())]
data = [( 1, "star1", random_value(sizes), random_value(ages), 4000.0, random_value(colors)),
( 2, "star2", random_value(sizes), random_value(ages), 6000.0, random_value(colors)),
( 3, "star3", random_value(sizes), random_value(ages), 3000.0, random_value(colors)),
( 4, "star4", random_value(sizes), random_value(ages), 8000.0, random_value(colors)),
( 5, "star5", random_value(sizes), random_value(ages), 5000.0, random_value(colors)),
( 6, "star6", random_value(sizes), random_value(ages), 3500.0, random_value(colors)),
( 7, "star7", random_value(sizes), random_value(ages), 4500.0, random_value(colors)),
( 8, "star8", random_value(sizes), random_value(ages), 7000.0, random_value(colors)),
( 9, "star9", random_value(sizes), random_value(ages), 5500.0, random_value(colors)),
(10, "star10", random_value(sizes), random_value(ages), 6500.0, random_value(colors)),
(11, "star11", random_value(sizes), random_value(ages), 7500.0, random_value(colors)),
(12, "star12", random_value(sizes), random_value(ages), 3200.0, random_value(colors)),
(13, "star13", random_value(sizes), random_value(ages), 4700.0, random_value(colors)),
(14, "star14", random_value(sizes), random_value(ages), 3700.0, random_value(colors)),
(15, "star15", random_value(sizes), random_value(ages), 6200.0, random_value(colors)),
(16, "star16", random_value(sizes), random_value(ages), 5800.0, random_value(colors)),
(17, "star17", random_value(sizes), random_value(ages), 5200.0, random_value(colors)),
(18, "star18", random_value(sizes), random_value(ages), 5100.0, random_value(colors)),
(19, "star19", random_value(sizes), random_value(ages), 5400.0, random_value(colors)),
(20, "star20", random_value(sizes), random_value(ages), 7200.0, random_value(colors))]
# define the schema for the DataFrame
schema = T.StructType([
T.StructField("id", T.IntegerType(), True),
T.StructField("name", T.StringType(), True),
T.StructField("size", T.StringType(), True),
T.StructField("age", T.StringType(), True),
T.StructField("temp", T.DoubleType(), True),
T.StructField("color", T.StringType(), True)
])
# create a DataFrame from the data and schema
df = spark.createDataFrame(data, schema)
df.show()
df_data_to_save = df \
.withColumn("partition_index", F.col("id") % F.lit(4000))
df_data_to_save.show()
df_data_to_save.write.format('bigquery') \
.option("temporaryGcsBucket", TEMP_GCS_BUCKET_NAME) \
.option("intermediateFormat", "orc") \
.option("createDisposition", "CREATE_IF_NEEDED") \
.option("partitionField", "partition_index") \
.option("partitionRangeStart", 0) \
.option("partitionRangeEnd", 4000) \
.option("partitionRangeInterval", 10) \
.option("clusteredFields", "Id") \
.option("allowFieldAddition", True) \
.mode("append") \
.save(BQ_TABLE_NAME)
When you try to execute, it will fail with the error described above. I can provide more details, but not in public access.
Regards,