GoogleCloudDataproc/spark-bigquery-connector

failed to save dataframe to bigquery with direct write mode

Closed this issue · 6 comments

kaiseu commented

the spark bq connector direct write mode failed to save a dataframe to bigquery.
the pseudo code is like this:

string sql="select * from dataset.table"
Dataset<Row> data = spark
                .read()
                .format(Constants.BIGQUERY)
                .option("materializationProject", env.getProjectName())
                .option("viewsEnabled", "true")
                .option("materializationDataset", env.getDatasetName())
                .option("cacheExpirationTimeInMinutes", 0)
                .load(sql);

data.write().mode(saveMode.getMode())
                .format("bigquery")
                .option("writeMethod", "direct")
                .option("enableModeCheckForSchemaFields", "false")
                .option(Constants.TABLE, fullTableName)
                .save();
 [ DEFAULT ] 2023-11-09T03:17:47.367 Exception in thread "main" com.google.cloud.bigquery.connector.common.BigQueryConnectorException: unexpected issue trying to save [sourcetype: string, event: string ... 1 more field] 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at com.google.cloud.spark.bigquery.write.BigQueryDataSourceWriterInsertableRelation.insert(BigQueryDataSourceWriterInsertableRelation.java:128) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:51) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:107) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301) 

[ DEFAULT ] 2023-11-09T03:17:47.367 Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException 
 [ DEFAULT ] 2023-11-09T03:17:47.367 Serialization trace: 
 [ DEFAULT ] 2023-11-09T03:17:47.367 bigQueryJobLabels (com.google.cloud.spark.bigquery.SparkBigQueryConfig) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 bqConfig (com.google.cloud.bigquery.connector.common.BigQueryClientFactory) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 writeClientFactory (com.google.cloud.spark.bigquery.write.context.BigQueryDirectDataWriterContextFactory) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 dataWriterContextFactory (com.google.cloud.spark.bigquery.write.DataSourceWriterContextPartitionHandler) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 this$0 (com.google.cloud.spark.bigquery.write.DataSourceWriterContextPartitionHandler$1) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2304) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2253) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2252) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2252) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at scala.Option.foreach(Option.scala:407) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2491) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2433) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2422) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2244) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	at com.google.cloud.spark.bigquery.write.BigQueryDataSourceWriterInsertableRelation.insert(BigQueryDataSourceWriterInsertableRelation.java:89) 
 [ DEFAULT ] 2023-11-09T03:17:47.367 	... 40 more 

connector verion: 0.32.2
spark version: 3.1.3
jdk: 1.8

isha97 commented

Hi @kaiseu,

Can you please provide me with the schema of the table that you are reading from, the spark datatypes of the data and the schema of the table that you are writing to so that I can reproduce this issue.

kaiseu commented

source schema:
sourcetype: string (nullable = true)
event: string (nullable = true)
dt: string (nullable = true)

spark datatypes:
sourcetype: string (nullable = true)
event: string (nullable = true)
dt: timestamp (nullable = true)

target schema:
sourcetype: string (nullable = true)
event: string (nullable = true)
dt: timestamp (nullable = true)

the sql is like:
select sourcetype,
event,
cast(dt as timestamp) as dt
from
XXXX

kaiseu commented

team, any suggestions on this?

isha97 commented

Hi @kaiseu Can you please provide with sample data in the the dataframe that you are trying to write?
I am guessing there might be some issue in the timestamp conversion. Please provide with few rows of the dt field in the original table that you are casting as timestamp.

Hi @kaiseu

Have you enabled KryoSerializer in spark? If so, can you please disable it and try if it works?

Hi @kaiseu, please try disabling the kyro serializer and reopen if you still face the issue.