apache/kyuubi

[Bug] spark hive connector failed to read tpcds parquet table

Closed this issue · 3 comments

Code of Conduct

Search before asking

  • I have searched in the issues and found no similar issues.

Describe the bug

I follow the steps to generate tpcds data for spark https://github.com/yaooqinn/tpcds-for-spark/tree/master, when querying data get bellow exceptions:
select * from catalog_sales limit 1;

Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/Users/fanng/opensource/tpcds/tpcds-for-spark/spark-warehouse/tpcds.db/catalog_sales/cs_sold_date_sk=2450815/part-00005-8942c9cb-bf45-4521-b53d-ab272c62ce58.c000.gz.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:264)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
	at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:98)
	at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:60)
	at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.liftedTree1$1(HivePartitionReaderFactory.scala:130)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.<init>(HivePartitionReaderFactory.scala:129)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.createPartitionWritableReader(HivePartitionReaderFactory.scala:122)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.buildReaderInternal(HivePartitionReaderFactory.scala:91)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.$anonfun$createReader$1(HivePartitionReaderFactory.scala:75)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.getNextReader(SparkFilePartitionReader.scala:99)
	at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.next(SparkFilePartitionReader.scala:46)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
	at scala.Option.exists(Option.scala:376)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
	at org.apache.parquet.column.Dictionary.decodeToBinary(Dictionary.java:41)
	at org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter$BinaryConverter.setDictionary(ETypeConverter.java:283)
	at org.apache.parquet.column.impl.ColumnReaderBase.<init>(ColumnReaderBase.java:415)
	at org.apache.parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:46)
	at org.apache.parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:82)
	at org.apache.parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:271)
	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:147)
	at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:109)
	at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:177)
	at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:109)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:141)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:230)

select * from store_returns limit 1;

Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 1 in block 0 in file file:/Users/fanng/opensource/tpcds/tpcds-for-spark/spark-warehouse/tpcds.db/store_returns/sr_returned_date_sk=2450820/part-00006-99abfdab-303f-4af4-8be1-46e581a8b189.c000.gz.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:264)
	at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:207)
	at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:98)
	at org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper.<init>(ParquetRecordReaderWrapper.java:60)
	at org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat.getRecordReader(MapredParquetInputFormat.java:75)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.liftedTree1$1(HivePartitionReaderFactory.scala:130)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory$$anon$2.<init>(HivePartitionReaderFactory.scala:129)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.createPartitionWritableReader(HivePartitionReaderFactory.scala:122)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.buildReaderInternal(HivePartitionReaderFactory.scala:91)
	at org.apache.kyuubi.spark.connector.hive.read.HivePartitionReaderFactory.$anonfun$createReader$1(HivePartitionReaderFactory.scala:75)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.getNextReader(SparkFilePartitionReader.scala:99)
	at org.apache.kyuubi.spark.connector.hive.read.SparkFilePartitionReader.next(SparkFilePartitionReader.scala:46)
	at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:120)
	at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:158)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:63)
	at scala.Option.exists(Option.scala:376)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:97)
	at org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:63)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.UnsupportedOperationException: org.apache.hadoop.hive.ql.io.parquet.convert.ETypeConverter$8$1
	at org.apache.parquet.io.api.PrimitiveConverter.addInt(PrimitiveConverter.java:98)
	at org.apache.parquet.column.impl.ColumnReaderBase$2$3.writeValue(ColumnReaderBase.java:297)
	at org.apache.parquet.column.impl.ColumnReaderBase.writeCurrentValueToConverter(ColumnReaderBase.java:440)
	at org.apache.parquet.column.impl.ColumnReaderImpl.writeCurrentValueToConverter(ColumnReaderImpl.java:30)
	at org.apache.parquet.io.RecordReaderImplementation.read(RecordReaderImplementation.java:406)
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:234)

Affects Version(s)

1.8.2

Kyuubi Server Log Output

No response

Kyuubi Engine Log Output

No response

Kyuubi Server Configurations

No response

Kyuubi Engine Configurations

No response

Additional context

No response

Are you willing to submit PR?

  • Yes. I would be willing to submit a PR with guidance from the Kyuubi community to fix.
  • No. I cannot submit a PR at this time.

There are known limitations with Hive Parquet reader implementation, you may want to try enabling spark.sql.parquet.writeLegacyFormat when generating TPC-DS using Spark.

There are known limitations with Hive Parquet reader implementation, you may want to try enabling spark.sql.parquet.writeLegacyFormat when generating TPC-DS using Spark.

Yes, it works after setting spark.sql.parquet.writeLegacyFormat to true. Is there any other limitations about kyuubi spark hive connector ? and any plan to support new parquet format?

Limitations were listed on our first meeting. KSCH only supports using Hive SerDe to read/write Hive tables, consequently, it has the same limitation as Spark's built-in Hive implementation, specifically, Hive 2.3.9, e.g. poor performance because of non-vectorized, does not support new Parquet logical types.

This requires a mechanism to respect spark.sql.hive.convertMetastoreParquet(or define a new dedicated configuration) to convert the Hive Parquet table reading to the Spark DataSource table.