If any of the input paths don't have files error is thrown
tooptoop4 opened this issue · 1 comments
tooptoop4 commented
input.yaml can define several paths to source data into diff dataframes
if path for any dataframe does not happen to contain any files then entire spark-submit fails.
example
df1
path: abc/
df2
path: def/
df3
path: xyz/
if there are no files under def/ then whole job fails, ideally it just creates empty df and rest of the job can complete.
maybe below solution could be added (with config flag):
https://stackoverflow.com/questions/33635071/how-to-allow-spark-to-ignore-missing-input-files
2020-07-23 17:33:17,542 [main] INFO com.yotpo.metorikku.Job - Registering tbl table
Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: s3a://bla/sa/asd/;
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:558)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:355)
at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at com.yotpo.metorikku.input.readers.file.FilesInput.read(FilesInput.scala:29)
at com.yotpo.metorikku.input.readers.file.FileInput.read(FileInput.scala:15)
at com.yotpo.metorikku.Job$$anonfun$registerDataframes$1.apply(Job.scala:68)
at com.yotpo.metorikku.Job$$anonfun$registerDataframes$1.apply(Job.scala:66)
at scala.collection.immutable.List.foreach(List.scala:392)
at com.yotpo.metorikku.Job.registerDataframes(Job.scala:66)
at com.yotpo.metorikku.Job.<init>(Job.scala:48)
at com.yotpo.metorikku.Metorikku$.delayedEndpoint$com$yotpo$metorikku$Metorikku$1(Metorikku.scala:10)
at com.yotpo.metorikku.Metorikku$delayedInit$body.apply(Metorikku.scala:7)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.App$$anonfun$main$1.apply(App.scala:76)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76)
at com.yotpo.metorikku.Metorikku$.main(Metorikku.scala:7)
at com.yotpo.metorikku.Metorikku.main(Metorikku.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
lyogev commented
This is just the way spark works, I don't like the proposed solution since it involves reading the DF non-lazily in the driver (using collect).
We could add this as a feature, feel free to add it :)