YotpoLtd/metorikku

CSV output MetorikkuWriteFailedException: Failed to write dataFrame error when using spark submit

Closed this issue · 5 comments

I am able to write the output in CSV format when running locally but doing the same on spark cluster produces an error (Note - It works perfectly fine when the file output type is Parquet)

/spark-submit --class com.yotpo.metorikku.Metorikku --deploy-mode cluster --driver-memory 4g --executor-memory 50g --executor-cores 8 --conf "spark.eventLog.dir=" --conf spark.eventLog.enabled=true --master spark://Host:Port /metorikku.jar -c

Error Message seen is

Exception in thread "main" java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:65)
at org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
Caused by: com.yotpo.metorikku.exceptions.MetorikkuWriteFailedException: Failed to write dataFrame: mlau to output: CSV on metric: mlpoc_metric
at com.yotpo.metorikku.metric.MetricSet.writeBatch(MetricSet.scala:81)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$write$1.apply(MetricSet.scala:97)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$write$1.apply(MetricSet.scala:88)
at scala.collection.immutable.List.foreach(List.scala:381)
at com.yotpo.metorikku.metric.MetricSet.write(MetricSet.scala:88)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$run$1.apply(MetricSet.scala:50)
at com.yotpo.metorikku.metric.MetricSet$$anonfun$run$1.apply(MetricSet.scala:44)
at scala.collection.immutable.List.foreach(List.scala:381)
at com.yotpo.metorikku.metric.MetricSet.run(MetricSet.scala:44)
at com.yotpo.metorikku.Metorikku$$anonfun$runMetrics$1.apply(Metorikku.scala:23)
at com.yotpo.metorikku.Metorikku$$anonfun$runMetrics$1.apply(Metorikku.scala:21)
at scala.collection.immutable.List.foreach(List.scala:381)
at com.yotpo.metorikku.Metorikku$.runMetrics(Metorikku.scala:21)
at com.yotpo.metorikku.Metorikku$.delayedEndpoint$com$yotpo$metorikku$Metorikku$1(Metorikku.scala:18)
at com.yotpo.metorikku.Metorikku$delayedInit$body.apply(Metorikku.scala:12)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.yotpo.metorikku.Metorikku$.main(Metorikku.scala:12)
at com.yotpo.metorikku.Metorikku.main(Metorikku.scala)
... 6 more
Caused by: java.lang.IllegalArgumentException: Illegal pattern component: XXX
at org.apache.commons.lang3.time.FastDateFormat.parsePattern(FastDateFormat.java:577)
at org.apache.commons.lang3.time.FastDateFormat.init(FastDateFormat.java:444)
at org.apache.commons.lang3.time.FastDateFormat.(FastDateFormat.java:437)
at org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:110)
at org.apache.commons.lang3.time.FastDateFormat$1.createInstance(FastDateFormat.java:109)
at org.apache.commons.lang3.time.FormatCache.getInstance(FormatCache.java:82)
at org.apache.commons.lang3.time.FastDateFormat.getInstance(FastDateFormat.java:205)
at org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:136)
at org.apache.spark.sql.execution.datasources.csv.CSVOptions.(CSVOptions.scala:39)
at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat.prepareWrite(CSVFileFormat.scala:67)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:140)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:656)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:656)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:273)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:267)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:225)
at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:644)
at com.yotpo.metorikku.output.writers.csv.CSVOutputWriter.write(CSVOutputWriter.scala:33)
at com.yotpo.metorikku.metric.MetricSet.writeBatch(MetricSet.scala:77)
... 29 more

I think you are not using the latest version. there have been some issues with the CSV writer in previous versions. please try again with v0.0.40

We were able to solve this by adding the following piece of code in our metric yaml file

coalesce: true
extraOptions:
  multiline: "true"
 timestampFormat: "yyyy-MM-dd'T'HH:mm:ss"   

Thank you @lyogev for your input. Once we upgrade, will try again without the code above.

@vineetkhattar5 can you please send a sample of your CSV file? I want to see if there's something we can do in the future so this won't happen again.

Resolved with:

output:

  • dataFrameName: df
    outputType: CSV
    coalesce: true
    timestampFormat: "MM:ss.SSS"
    options:
    timestampFormat: "MM:ss.SSS"
    outputOptions:
    timestampFormat: "MM:ss.SSS"
    saveMode: Overwrite
    path: valmetric=df
    extraOptions:
    multiline: "true"
    timestampFormat: "yyyy-MM-dd'T'HH:mm:ss"