GoogleCloudDataproc/spark-bigquery-connector

Spark Write BIGNUMERIC issue

lucaracca opened this issue · 5 comments

Hi,
we are trying to write a spark dataframe on bigquery, the bigquery table does not exist at write moment, so it should be created by the connector.
Out dataframe has a DecimalType(38,20) column and during the write we have an error:

Traceback (most recent call last):
  File "/tmp/job-7a2c62a4/main_test.py", line 55, in <module>
    main(sys.argv)
  File "/tmp/job-7a2c62a4/main_test.py", line 48, in main
    df.write.format("bigquery").options(**{"temporaryGcsBucket":"xxxxxxxxxxxx"}).save("CADI0W.TEST_BIGNUMERIC")
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 968, in save
  File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 190, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o92.save.
: com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Failed to write to BigQuery
	at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:110)
	at com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.insert(BigQueryDeprecatedIndirectInsertableRelation.java:43)
	at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:58)
	at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:106)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Error while reading data, error message: The value for column 'numeric' is out of valid NUMERIC range: Value will lose precision after scaling down to NUMERIC type; input length: 16; scale: 20; input data: 01158e46f2241f2029; in column 'numeric' File: gs://xxxxxxxxxxxxx/.spark-bigquery-application_1692872022194_0253-b5d0e1d8-301d-43cc-9eb8-aea7892d7546/part-00000-c8ca4c43-9b04-4b15-bf8c-2b3b67aa75b0-c000.snappy.parquet
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.reload(Job.java:419)
	at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitFor(Job.java:252)
	at com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:333)
	at com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:323)
	at com.google.cloud.bigquery.connector.common.BigQueryClient.loadDataIntoTable(BigQueryClient.java:553)
	at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.java:130)
	at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:107)
	... 44 more

Our test code used to replicate the issue:

with SparkSession.builder.getOrCreate() as spark: # Initialize the Spark session 

    from pyspark.sql.types import DecimalType, StructType, StructField, StringType 
    import decimal 
 
 
    data = [("string1", decimal.Decimal('111111111111111123.45678901234567811111')), 
            ("string2", decimal.Decimal('111111111111111123.45678901234567811111'))] 
 
    schema = StructType([ 
        StructField("stringa", StringType(), True), 
        StructField("numeric", DecimalType(38, 20), True) 
    ]) 
 
 
    df = spark.createDataFrame(data, schema=schema) 
 
    df.printSchema() 
    df.show() 
 
    df.write.format("bigquery").options(**{"temporaryGcsBucket":"xxxxxxxxxx"}).save("CADI0W.TEST_BIGNUMERIC") 

Connector version tested: gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.32.2.jar

it seems that the connector is creating the table with the Numeric type column instead of Bignumeric, reading the project readme we expected that based on the decimalType scale the correct column type would be handled.

Thanks

isha97 commented

Hi @lucaracca ,

I have a few questions to further debug the issue,

  1. What is the image version of the dataproc cluster that you are using? Or are you using Dataproc Serverless?
  2. How did you specify the connector version while running the job?

Thank you,
Isha

Hi @isha97,

  1. The image version of the cluster is 2.1.18-ubuntu20

image

  1. we specify the version while running the job using the --jars option

271129741-eb040c6e-d629-4353-bb07-ace35541476c

Thanks
Luca

isha97 commented

Hi @lucaracca

The image version you specified already has spark-bigquery connector installed and using --jar doesn't override the connector version. Instead, while creating the cluster please specify the jars in metadata properties
--metadata SPARK_BQ_CONNECTOR_VERSION=0.32.2

So, the command to create cluster should be like
gcloud dataproc clusters create <cluster-name> --region <region> --metadata SPARK_BQ_CONNECTOR_VERSION=0.32.2

@isha97
Hi, I have a similar issue as @lucaracca.

scala version : 2.12
spark version : 2.4.2
spark bigquery connector version : 0.34.0 (spark-bigquery-with-dependencies_2.12-0.34.0.jar)
spark running environment : Yarn

I am trying to write a spark dataframe to bigquery and the dataframe has a DecimalType(15, 10) on column.
And the following error occurred.

23/11/13 03:05:21 ERROR yarn.ApplicationMaster: User class threw exception: com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Failed to write to BigQuery
com.google.cloud.bigquery.connector.common.BigQueryConnectorException: Failed to write to BigQuery
at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:110)
at com.google.cloud.spark.bigquery.write.BigQueryDeprecatedIndirectInsertableRelation.insert(BigQueryDeprecatedIndirectInsertableRelation.java:43)
at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:54)
at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:106)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at com.kakaomobility.mobdata.etl.DumpWithUtils$.write(DumpWithUtils.scala:151)
at com.kakaomobility.mobdata.etl.DumpRDBMS$.$anonfun$main$2(DumpRDBMS.scala:36)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:658)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1402)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.BigQueryException: Error while reading data, error message: The value for column 'lat' is out of valid NUMERIC range: Value will lose precision after scaling down to NUMERIC type; input length: 8; scale: 10; input data: 5279bc7c57000000; in column 'lat' File: gs:///.snappy.parquet
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.reload(Job.java:419)
at com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.Job.waitFor(Job.java:252)
at com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:333)
at com.google.cloud.bigquery.connector.common.BigQueryClient.createAndWaitFor(BigQueryClient.java:323)
at com.google.cloud.bigquery.connector.common.BigQueryClient.loadDataIntoTable(BigQueryClient.java:553)
at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.loadDataToBigQuery(BigQueryWriteHelper.java:130)
at com.google.cloud.spark.bigquery.write.BigQueryWriteHelper.writeDataFrameToBigQuery(BigQueryWriteHelper.java:107)

In Bigquery Document, Numeric Data Type has Precision=38, Scale=9. And It appears that an error occurs because �the decimalType of my dataframe has scale=10. How can I write DecimalType(15, 10) to BIGNUMERIC type of bigquery?

Thank you for reading my question.

Hi @moonkwoo ,

Are you writing to an existing table? If so, please update the table's scema and convert the NUMERIC field to BIGNUMERIC, with a scale of at least 10.