branky/cascading.hive

Please advice how to make cascading.hive work with parquet through hcatalog

btrofimov opened this issue · 5 comments

Please advice how to make cascading.hive work with parquet through hcatalog.
I have very simple application which works well if table is text-based. When I switch to parquet then ClassNotFound Exception arised pointing on HiveOutputFormat.
Thanks in advance.

Interesting use case. Thank you for reporting this issue. Which version of HCatalog are you using? Would you mind also provide the DDL? Send a pull request to add a test which can reproduce the issue would be even better.

For your reference, a test about working with ORC through hcatalog:

public void testOrcInOut() throws IOException {

Thanks for your response, @branky . Seems I have not set HADOOP_CLASPATH system variable.
However I faced next more tricky exception with "parquet.hive.serde.ParquetHiveSerDe" (below). I think that there is not relation between this exception and cascading-hive, however I hope you might have some advices what it might be. HCatalog should be abstraction over storage formats, it should be transparent for application, at least I thought so. However this exception confuses. Does cascading-hive should know something about underlying format (parquet in my case)? Cloudera says that parquet.hive.serde.ParquetHiveSerDe just is not supported by cascading. Any advices or ideas would be very helpful. Thanks!
P.S. hcatalog version is 0.5


Exception in thread "main" cascading.flow.planner.PlannerException: could not build flow from assembly: [unable to pack object: cascading.flow.hadoop.HadoopFlowStep]
at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:576)
at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:263)
at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:80)
at cascading.flow.FlowConnector.connect(FlowConnector.java:459)
at cascading.flow.FlowConnector.connect(FlowConnector.java:450)
at cascading.flow.FlowConnector.connect(FlowConnector.java:426)
at cascading.flow.FlowConnector.connect(FlowConnector.java:275)
at cascading.flow.FlowConnector.connect(FlowConnector.java:220)
at cascading.flow.FlowConnector.connect(FlowConnector.java:202)
at com.collective.grandcentral.profiles.Tool$delayedInit$body.apply(Tool.scala:51)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:60)
at scala.App$$anonfun$main$1.apply(App.scala:60)
at scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
at scala.collection.immutable.List.foreach(List.scala:76)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:30)
at scala.App$class.main(App.scala:60)
at com.collective.grandcentral.profiles.Tool$.main(Tool.scala:17)
at com.collective.grandcentral.profiles.Tool.main(Tool.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.hadoop.util.RunJar.main(RunJar.java:208)
Caused by: cascading.flow.FlowException: unable to pack object: cascading.flow.hadoop.HadoopFlowStep
at cascading.flow.hadoop.HadoopFlowStep.pack(HadoopFlowStep.java:195)
at cascading.flow.hadoop.HadoopFlowStep.getInitializedConfig(HadoopFlowStep.java:170)
at cascading.flow.hadoop.HadoopFlowStep.createFlowStepJob(HadoopFlowStep.java:201)
at cascading.flow.hadoop.HadoopFlowStep.createFlowStepJob(HadoopFlowStep.java:69)
at cascading.flow.planner.BaseFlowStep.getFlowStepJob(BaseFlowStep.java:768)
at cascading.flow.BaseFlow.initializeNewJobsMap(BaseFlow.java:1222)
at cascading.flow.BaseFlow.initialize(BaseFlow.java:199)
at cascading.flow.hadoop.planner.HadoopPlanner.buildFlow(HadoopPlanner.java:257)
... 23 more
Caused by: java.io.NotSerializableException: parquet.hive.serde.ParquetHiveSerDe
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
at java.util.HashMap.writeObject(HashMap.java:1001)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:940)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1469)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1483)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330)
at cascading.flow.hadoop.util.JavaObjectSerializer.serialize(JavaObjectSerializer.java:57)
at cascading.flow.hadoop.util.HadoopUtil.serializeBase64(HadoopUtil.java:265)
at cascading.flow.hadoop.HadoopFlowStep.pack(HadoopFlowStep.java:191)
... 30 more

"parquet.hive.serde.ParquetHiveSerDe just is not supported by cascading." - It is because the instance of ParquetHiveSerDe is not Serializable. The SerDe is being used by HCatalog to read and write data. The concrete SerDe instance is retrieved from Table instance when connecting to Hive metastore , then it will be serialized and propagated to mappers.

If the SerDe instance itself can't be serialized, it has to be created on each mapper side by connecting to Hive metastore. This is not good. To make ParquetHiveSerDe to be Serializable required dependent classes to Serializable too, seems not be a practical solution. May need workaround to change the way HCatalog initiating SerDe instances.

I have worked out the support for Parquet. I tested it with Hive/HCatalog 0.12. Refer to https://cwiki.apache.org/confluence/display/Hive/Parquet

You need parquet-hive-bundle jar in your classpath if you are not using Hive 0.13. Please verify and reopen this issue if it not works for you.

thanks @branky , it works well.