failed to save dataframe to bigquery with direct write mode
Closed this issue · 6 comments
the spark bq connector direct write mode failed to save a dataframe to bigquery.
the pseudo code is like this:
string sql="select * from dataset.table"
Dataset<Row> data = spark
.read()
.format(Constants.BIGQUERY)
.option("materializationProject", env.getProjectName())
.option("viewsEnabled", "true")
.option("materializationDataset", env.getDatasetName())
.option("cacheExpirationTimeInMinutes", 0)
.load(sql);
data.write().mode(saveMode.getMode())
.format("bigquery")
.option("writeMethod", "direct")
.option("enableModeCheckForSchemaFields", "false")
.option(Constants.TABLE, fullTableName)
.save();
[ DEFAULT ] 2023-11-09T03:17:47.367 Exception in thread "main" com.google.cloud.bigquery.connector.common.BigQueryConnectorException: unexpected issue trying to save [sourcetype: string, event: string ... 1 more field]
[ DEFAULT ] 2023-11-09T03:17:47.367 at com.google.cloud.spark.bigquery.write.BigQueryDataSourceWriterInsertableRelation.insert(BigQueryDataSourceWriterInsertableRelation.java:128)
[ DEFAULT ] 2023-11-09T03:17:47.367 at com.google.cloud.spark.bigquery.write.CreatableRelationProviderHelper.createRelation(CreatableRelationProviderHelper.java:51)
[ DEFAULT ] 2023-11-09T03:17:47.367 at com.google.cloud.spark.bigquery.BigQueryRelationProvider.createRelation(BigQueryRelationProvider.scala:107)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:180)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:218)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:215)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:176)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:133)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:132)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
[ DEFAULT ] 2023-11-09T03:17:47.367 Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
[ DEFAULT ] 2023-11-09T03:17:47.367 Serialization trace:
[ DEFAULT ] 2023-11-09T03:17:47.367 bigQueryJobLabels (com.google.cloud.spark.bigquery.SparkBigQueryConfig)
[ DEFAULT ] 2023-11-09T03:17:47.367 bqConfig (com.google.cloud.bigquery.connector.common.BigQueryClientFactory)
[ DEFAULT ] 2023-11-09T03:17:47.367 writeClientFactory (com.google.cloud.spark.bigquery.write.context.BigQueryDirectDataWriterContextFactory)
[ DEFAULT ] 2023-11-09T03:17:47.367 dataWriterContextFactory (com.google.cloud.spark.bigquery.write.DataSourceWriterContextPartitionHandler)
[ DEFAULT ] 2023-11-09T03:17:47.367 this$0 (com.google.cloud.spark.bigquery.write.DataSourceWriterContextPartitionHandler$1)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2304)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2253)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2252)
[ DEFAULT ] 2023-11-09T03:17:47.367 at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
[ DEFAULT ] 2023-11-09T03:17:47.367 at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
[ DEFAULT ] 2023-11-09T03:17:47.367 at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2252)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
[ DEFAULT ] 2023-11-09T03:17:47.367 at scala.Option.foreach(Option.scala:407)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2491)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2433)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2422)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:902)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2204)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2225)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2244)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.SparkContext.runJob(SparkContext.scala:2269)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.api.java.JavaRDDLike.collect(JavaRDDLike.scala:362)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.api.java.JavaRDDLike.collect$(JavaRDDLike.scala:361)
[ DEFAULT ] 2023-11-09T03:17:47.367 at org.apache.spark.api.java.AbstractJavaRDDLike.collect(JavaRDDLike.scala:45)
[ DEFAULT ] 2023-11-09T03:17:47.367 at com.google.cloud.spark.bigquery.write.BigQueryDataSourceWriterInsertableRelation.insert(BigQueryDataSourceWriterInsertableRelation.java:89)
[ DEFAULT ] 2023-11-09T03:17:47.367 ... 40 more
connector verion: 0.32.2
spark version: 3.1.3
jdk: 1.8
Hi @kaiseu,
Can you please provide me with the schema of the table that you are reading from, the spark datatypes of the data and the schema of the table that you are writing to so that I can reproduce this issue.
source schema:
sourcetype: string (nullable = true)
event: string (nullable = true)
dt: string (nullable = true)
spark datatypes:
sourcetype: string (nullable = true)
event: string (nullable = true)
dt: timestamp (nullable = true)
target schema:
sourcetype: string (nullable = true)
event: string (nullable = true)
dt: timestamp (nullable = true)
the sql is like:
select sourcetype,
event,
cast(dt as timestamp) as dt
from
XXXX
team, any suggestions on this?
Hi @kaiseu Can you please provide with sample data in the the dataframe that you are trying to write?
I am guessing there might be some issue in the timestamp conversion. Please provide with few rows of the dt
field in the original table that you are casting as timestamp.