[Bug] spark hive connector failed to read tpcds parquet table
Closed this issue · 3 comments
Code of Conduct
- I agree to follow this project's Code of Conduct
Search before asking
- I have searched in the issues and found no similar issues.
Describe the bug
I follow the steps to generate tpcds data for spark https://github.com/yaooqinn/tpcds-for-spark/tree/master, when querying data get bellow exceptions:
select * from catalog_sales limit 1;
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/Users/fanng/opensource/tpcds/tpcds-for-spark/spark-warehouse/tpcds.db/catalog_sales/cs_sold_date_sk=2450815/part-00005-8942c9cb-bf45-4521-b53d-ab272c62ce58.c000.gz.parquet
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:264)
at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:98)
at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:60)
at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.liftedTree1$1(HivePartitionReaderFactory.scala:130)
at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.<init>(HivePartitionReaderFactory.scala:129)
at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.createPartitionWritableReader(HivePartitionReaderFactory.scala:122)
at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.buildReaderInternal(HivePartitionReaderFactory.scala:91)
at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.$anonfun$createReader$1(HivePartitionReaderFactory.scala:75)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.getNextReader(SparkFilePartitionReader.scala:99)
at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.next(SparkFilePartitionReader.scala:46)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
at scala.Option.exists(Option.scala:376)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
at org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter$BinaryConverter.setDictionary(ETypeConverter.java:283)
at org.apache.parquet.column.impl.ColumnReaderBase.<init>(ColumnReaderBase.java:415)
at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:46)
at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:82)
at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:177)
at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:141)
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:230)
select * from store_returns limit 1;
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/Users/fanng/opensource/tpcds/tpcds-for-spark/spark-warehouse/tpcds.db/store_returns/sr_returned_date_sk=2450820/part-00006-99abfdab-303f-4af4-8be1-46e581a8b189.c000.gz.parquet
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:264)
at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:98)
at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:60)
at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.liftedTree1$1(HivePartitionReaderFactory.scala:130)
at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.<init>(HivePartitionReaderFactory.scala:129)
at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.createPartitionWritableReader(HivePartitionReaderFactory.scala:122)
at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.buildReaderInternal(HivePartitionReaderFactory.scala:91)
at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.$anonfun$createReader$1(HivePartitionReaderFactory.scala:75)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.getNextReader(SparkFilePartitionReader.scala:99)
at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.next(SparkFilePartitionReader.scala:46)
at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
at scala.Option.exists(Option.scala:376)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter$8$1
at org.apache.parquet.io.api.PrimitiveConverter.addInt(PrimitiveConverter.java:98)
at org.apache.parquet.column.impl.ColumnReaderBase$2$3.writeValue(ColumnReaderBase.java:297)
at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:234)
Affects Version(s)
1.8.2
Kyuubi Server Log Output
No response
Kyuubi Engine Log Output
No response
Kyuubi Server Configurations
No response
Kyuubi Engine Configurations
No response
Additional context
No response
Are you willing to submit PR?
- Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix.
- No. I cannot submit a PR at this time.
There are known limitations with Hive Parquet reader implementation, you may want to try enabling spark.sql.parquet.writeLegacyFormat
when generating TPC-DS using Spark.
There are known limitations with Hive Parquet reader implementation, you may want to try enabling
spark.sql.parquet.writeLegacyFormat
when generating TPC-DS using Spark.
Yes, it works after setting spark.sql.parquet.writeLegacyFormat
to true
. Is there any other limitations about kyuubi spark hive connector ? and any plan to support new parquet format?
Limitations were listed on our first meeting. KSCH only supports using Hive SerDe to read/write Hive tables, consequently, it has the same limitation as Spark's built-in Hive implementation, specifically, Hive 2.3.9, e.g. poor performance because of non-vectorized, does not support new Parquet logical types.
This requires a mechanism to respect spark.sql.hive.convertMetastoreParquet
(or define a new dedicated configuration) to convert the Hive Parquet table reading to the Spark DataSource table.