YotpoLtd/metorikku

how to read from a hudi input?

tooptoop4 opened this issue · 5 comments

i have

inputs:
  mydf:
    file:
      path: s3a://xx/a/b/c/

there are partition folders under s3a://xx/a/b/c/ path . and there are hudi parquets under them

i want the mydf to get the partition columns in the df too.

i get

2020-03-25 16:20:27,070 [main] INFO  org.apache.spark.scheduler.DAGScheduler - Job 1 finished: load at FilesInput.scala:29, took 0.073979 s
Exception in thread "main" org.apache.spark.sql.AnalysisException: Unable to infer schema for Parquet. It must be specified manually.;
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:208)
        at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$9.apply(DataSource.scala:208)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.sql.execution.datasources.DataSource.getOrInferFileFormatSchema(DataSource.scala:207)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:393)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
        at com.yotpo.metorikku.input.readers.file.FilesInput.read(FilesInput.scala:29)
        at com.yotpo.metorikku.input.readers.file.FileInput.read(FileInput.scala:15)
        at com.yotpo.metorikku.Job$$anonfun$registerDataframes$1.apply(Job.scala:68)
        at com.yotpo.metorikku.Job$$anonfun$registerDataframes$1.apply(Job.scala:66)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at com.yotpo.metorikku.Job.registerDataframes(Job.scala:66)
        at com.yotpo.metorikku.Job.<init>(Job.scala:48)
        at com.yotpo.metorikku.Metorikku$.delayedEndpoint$com$yotpo$metorikku$Metorikku$1(Metorikku.scala:10)
        at com.yotpo.metorikku.Metorikku$delayedInit$body.apply(Metorikku.scala:7)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.App$class.main(App.scala:76)
        at com.yotpo.metorikku.Metorikku$.main(Metorikku.scala:7)
        at com.yotpo.metorikku.Metorikku.main(Metorikku.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:890)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:192)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:217)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
2020-03-25 16:20:27,077 [pool-1-thread-1] INFO  org.apache.spark.SparkContext - Invoking stop() from shutdown hook

also tried format: com.uber.hoodie

2020-03-25 16:36:07,067 [main] INFO  com.yotpo.metorikku.Job - Registering mydf table
Exception in thread "main" com.uber.hoodie.exception.HoodieException: 'path' must be specified.
        at com.uber.hoodie.DefaultSource.createRelation(DefaultSource.scala:57)
        at com.uber.hoodie.DefaultSource.createRelation(DefaultSource.scala:46)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341)
        at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
        at com.yotpo.metorikku.input.readers.file.FilesInput.read(FilesInput.scala:29)
        at com.yotpo.metorikku.input.readers.file.FileInput.read(FileInput.scala:15)
        at com.yotpo.metorikku.Job$$anonfun$registerDataframes$1.apply(Job.scala:68)
        at com.yotpo.metorikku.Job$$anonfun$registerDataframes$1.apply(Job.scala:66)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at com.yotpo.metorikku.Job.registerDataframes(Job.scala:66)
        at com.yotpo.metorikku.Job.<init>(Job.scala:48)
        at com.yotpo.metorikku.Metorikku$.delayedEndpoint$com$yotpo$metorikku$Metorikku$1(Metorikku.scala:10)
        at com.yotpo.metorikku.Metorikku$delayedInit$body.apply(Metorikku.scala:7)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.App$class.main(App.scala:76)
        at com.yotpo.metorikku.Metorikku$.main(Metorikku.scala:7)
        at com.yotpo.metorikku.Metorikku.main(Metorikku.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:890)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:192)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:217)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

also tried --conf spark.sql.hive.convertMetastoreParquet=false same error result.

I have --jars "/home/ec2-user/hoodie-spark-bundle-0.4.6.jar"

these are COW tables, https://hudi.incubator.apache.org/docs/querying_data.html mentions:
spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter], classOf[org.apache.hadoop.fs.PathFilter]);

^^not sure how to set that in metorikku?

if i do s3a://xx/a/b/c/*/*/*.parquet it seems to get further but not sure if? a) right approach, b) will have the partition columns, c) will have dupes in the data?

Hi @tooptoop4 sorry for the late response.
I actually have no idea how you read in spark a hudi table without hive... I think that's the only way for now, hive actually holds the partition information.
In any case you need to read this input as hoodie.
so in the input:

inputs:
  mydf:
    file:
      path: s3a://xx/a/b/c/
    options:
      format: com.uber.hoodie

that didnt work for me

What's the error you're getting?
You may need to do something like:
**/*.parquet in the path

**/*.parquet only works when the hudi 'tbl' has been inserted just once, once it has been updated more parquets get produced. This leads to duplicate rows being read. https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-Whydowehavetoset2differentwaysofconfiguringSparktoworkwithHudi? "Without understanding of Hudi's file layout, engines would just plainly reading all the parquet files and displaying the data within them, with massive amounts of duplicates in the result."

--conf spark.hadoop.mapreduce.input.pathFilter.class=com.uber.hoodie.hadoop.HoodieROTablePathFilter works but it means all inputs (even some non hudi ones) get the pathFilter.class. Do you think metorikku needs a new option like (pathFilterClass per input)? As hudi support seems to be a key feature of metorikku