YotpoLtd/metorikku

If any of the input paths don't have files error is thrown

tooptoop4 opened this issue · 1 comments

input.yaml can define several paths to source data into diff dataframes
if path for any dataframe does not happen to contain any files then entire spark-submit fails.
example
df1
path: abc/
df2
path: def/
df3
path: xyz/

if there are no files under def/ then whole job fails, ideally it just creates empty df and rest of the job can complete.
maybe below solution could be added (with config flag):
https://stackoverflow.com/questions/33635071/how-to-allow-spark-to-ignore-missing-input-files

2020-07-23 17:33:17,542 [main] INFO  com.yotpo.metorikku.Job - Registering tbl table
Exception in thread "main" org.apache.spark.sql.AnalysisException: Path does not exist: s3a://bla/sa/asd/;
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:558)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.immutable.List.flatMap(List.scala:355)
        at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
        at com.yotpo.metorikku.input.readers.file.FilesInput.read(FilesInput.scala:29)
        at com.yotpo.metorikku.input.readers.file.FileInput.read(FileInput.scala:15)
        at com.yotpo.metorikku.Job$$anonfun$registerDataframes$1.apply(Job.scala:68)
        at com.yotpo.metorikku.Job$$anonfun$registerDataframes$1.apply(Job.scala:66)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at com.yotpo.metorikku.Job.registerDataframes(Job.scala:66)
        at com.yotpo.metorikku.Job.<init>(Job.scala:48)
        at com.yotpo.metorikku.Metorikku$.delayedEndpoint$com$yotpo$metorikku$Metorikku$1(Metorikku.scala:10)
        at com.yotpo.metorikku.Metorikku$delayedInit$body.apply(Metorikku.scala:7)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.App$class.main(App.scala:76)
        at com.yotpo.metorikku.Metorikku$.main(Metorikku.scala:7)
        at com.yotpo.metorikku.Metorikku.main(Metorikku.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:845)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:161)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:184)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:86)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:920)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:929)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

This is just the way spark works, I don't like the proposed solution since it involves reading the DF non-lazily in the driver (using collect).
We could add this as a feature, feel free to add it :)