Azure/spark-cdm-connector

[Issue] Exception: The number of columns in CSV/parquet file is not equal to the number of fields in Spark StructType

Closed this issue · 1 comments

Did you read the pinned issues and search the error message?

  • Yes. I read and did a search, but I didn't find an answer.

Summary of issue

I have created a python script in DataBricks notebook that gets all the manifest files, reads the json and creates a list of all available tables/entities to read. the spark-cdm-connector loops through all the manifests but it gets stuck on at least one entity ("Tables/Custom/Custom.manifest.cdm.json", EntityName: "HCMPERSONDETAILS") with this exception:

The number of columns in CSV/parquet file is not equal to the number of fields in Spark StructType. Either modify the attributes in manifest to make it equal to the number of columns in CSV/parquet files or modify the csv/parquet file

I cannot find a way to get around this, I am not able to modify the manifest or the CSV file created by D365 export to datalake.

Error stack trace

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 728.0 failed 4 times, most recent failure: Lost task 0.3 in stage 728.0 (TID 4125) (10.139.64.4 executor 1): java.lang.Exception: The number of columns in CSV/parquet file is not equal to the number of fields in Spark StructType. Either modify the attributes in manifest to make it equal to the number of columns in CSV/parquet files or modify the csv/parquet file
	at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:63)
	at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:20)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.next(DataSourceRDD.scala:135)
	at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:172)
	at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:169)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:67)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:761)
	at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:169)
	at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
	at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:104)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:96)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1696)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3312)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3244)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3235)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3235)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1424)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1424)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1424)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3524)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3462)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3450)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1169)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1157)
	at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2713)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$runSparkJobs$1(Collector.scala:312)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:271)
	at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:322)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:105)
	at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:112)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:115)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:104)
	at org.apache.spark.sql.execution.qrc.InternalRowFormat$.collect(cachedSparkResults.scala:88)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$computeResult$1(ResultCacheManager.scala:527)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.collectResult$1(ResultCacheManager.scala:519)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.computeResult(ResultCacheManager.scala:539)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:396)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:390)
	at org.apache.spark.sql.execution.qrc.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:292)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollectResult$1(SparkPlan.scala:431)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectResult(SparkPlan.scala:428)
	at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3424)
	at org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:3415)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$3(Dataset.scala:4290)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:777)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4288)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:243)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:392)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:188)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:985)
	at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:142)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:342)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4288)
	at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3414)
	at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:267)
	at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:101)
	at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:720)
	at com.databricks.backend.daemon.driver.JupyterDriverLocal.computeListResultsItem(JupyterDriverLocal.scala:1332)
	at com.databricks.backend.daemon.driver.JupyterDriverLocal$JupyterEntryPoint.addCustomDisplayData(JupyterDriverLocal.scala:489)
	at sun.reflect.GeneratedMethodAccessor519.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
	at py4j.Gateway.invoke(Gateway.java:306)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.Exception: The number of columns in CSV/parquet file is not equal to the number of fields in Spark StructType. Either modify the attributes in manifest to make it equal to the number of columns in CSV/parquet files or modify the csv/parquet file
	at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:63)
	at com.microsoft.cdm.read.CDMDataReader.get(CDMDataReader.scala:20)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.next(DataSourceRDD.scala:135)
	at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:172)
	at org.apache.spark.sql.execution.datasources.v2.MetricsRowIterator.next(DataSourceRDD.scala:169)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.next(DataSourceRDD.scala:67)
	at org.apache.spark.InterruptibleIterator.next(InterruptibleIterator.scala:40)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:761)
	at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
	at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:186)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
	at org.apache.spark.scheduler.Task.doRunTask(Task.scala:169)
	at org.apache.spark.scheduler.Task.$anonfun$run$4(Task.scala:137)
	at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:104)
	at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:137)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.scheduler.Task.run(Task.scala:96)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:902)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1696)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:905)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:760)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Platform name

Azure Databricks

Spark version

Spark 3.3.0

CDM jar version

spark_cdm_connector_assembly_synapse_spark3_2_1_19_4.jar

What is the format of the data you are trying to read/write?

.csv

Others have ran into the same error as you. Maybe you should search and check them first. #84 #86

A search will easily yield results. https://github.com/Azure/spark-cdm-connector/search?q=The+number+of+columns+in+CSV%2Fparquet&type=issues

Comment or reopen if you still have issues.