NVIDIA/spark-rapids

[BUG] Spark UT framework: select one deep nested complex field after join, IOException parsing parquet

Opened this issue · 0 comments

contacts parquet is defined as following and has saved here: contacts.zip

Reproduce:

val dataSourceName = "parquet" 
val path = "/home/fejiang/Desktop"
spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")
val schema = ("`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " +
  "`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, " +
  "`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, " +
  "`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, " +
  "`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, " +
  "`last`: STRING>,STRING>,`p` INT")
spark.read.format(dataSourceName).schema(schema).load(path + "/contacts").createOrReplaceTempView("contacts")

val departmentSchema = "`depId` INT,`depName` STRING,`contactId` INT,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>"
spark.read.format(dataSourceName).schema(departmentSchema).load(path + "/departments")
     .createOrReplaceTempView("departments")  
val query = spark.sql("select contacts.name.middle from contacts, departments where contacts.id = departments.contactId")
query.show()

CPU:

scala> val dataSourceName = "parquet" 
dataSourceName: String = parquet

scala> val path = "/home/fejiang/Desktop"
path: String = /home/fejiang/Desktop

scala> spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")

scala> val schema = ("`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " +
     |   "`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, " +
     |   "`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>,STRING>,`p` INT")
schema: String = `id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, `address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>,STRING>,`p` INT

scala> spark.read.format(dataSourceName).schema(schema).load(path + "/contacts").createOrReplaceTempView("contacts")

scala> 

scala> val departmentSchema = "`depId` INT,`depName` STRING,`contactId` INT,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>"
departmentSchema: String = `depId` INT,`depName` STRING,`contactId` INT,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>

scala> spark.read.format(dataSourceName).schema(departmentSchema).load(path + "/departments")
res15: org.apache.spark.sql.DataFrame = [depId: int, depName: string ... 2 more fields]

scala>      .createOrReplaceTempView("departments")  

scala> val query = spark.sql("select contacts.name.middle from contacts, departments where contacts.id = departments.contactId")
query: org.apache.spark.sql.DataFrame = [middle: string]

scala> query.show()
+------+                                                                        
|middle|
+------+
|    X.|
|    Y.|
+------+

GPU:

scala> val dataSourceName = "parquet" 
dataSourceName: String = parquet

scala> val path = "/home/fejiang/Desktop"
path: String = /home/fejiang/Desktop

scala> spark.conf.set("spark.sql.parquet.enableVectorizedReader", "true")

scala> val schema = ("`id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, " +
     |   "`address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, " +
     |   "`address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, " +
     |   "`last`: STRING>,STRING>,`p` INT")
schema: String = `id` INT,`name` STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>, `address` STRING,`pets` INT,`friends` ARRAY<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`relatives` MAP<STRING, STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>>,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>,`relations` MAP<STRUCT<`first`: STRING, `middle`: STRING, `last`: STRING>,STRING>,`p` INT

scala> spark.read.format(dataSourceName).schema(schema).load(path + "/contacts").createOrReplaceTempView("contacts")

scala> 

scala> val departmentSchema = "`depId` INT,`depName` STRING,`contactId` INT,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>"
departmentSchema: String = `depId` INT,`depName` STRING,`contactId` INT,`employer` STRUCT<`id`: INT, `company`: STRUCT<`name`: STRING, `address`: STRING>>

scala> spark.read.format(dataSourceName).schema(departmentSchema).load(path + "/departments")
res2: org.apache.spark.sql.DataFrame = [depId: int, depName: string ... 2 more fields]

scala>      .createOrReplaceTempView("departments")  

scala> val query = spark.sql("select contacts.name.middle from contacts, departments where contacts.id = departments.contactId")
query: org.apache.spark.sql.DataFrame = [middle: string]

scala> query.show()
24/10/18 17:39:46 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> _extract_middle#31 AS middle#29 will run on GPU
    *Exec <BroadcastHashJoinExec> will run on GPU
      *Exec <ProjectExec> will run on GPU
        *Expression <Alias> name#1.middle AS _extract_middle#31 will run on GPU
          *Expression <GetStructField> name#1.middle will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(id#0) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU
      *Exec <BroadcastExchangeExec> will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(contactId#20) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU

24/10/18 17:39:46 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> _extract_middle#31 AS middle#29 will run on GPU
    *Exec <BroadcastHashJoinExec> will run on GPU
      *Exec <ProjectExec> will run on GPU
        *Expression <Alias> name#1.middle AS _extract_middle#31 will run on GPU
          *Expression <GetStructField> name#1.middle will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(id#0) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU
      *Exec <BroadcastExchangeExec> will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(contactId#20) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU

24/10/18 17:39:46 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> _extract_middle#31 AS middle#29 will run on GPU
    *Exec <BroadcastHashJoinExec> will run on GPU
      *Exec <ProjectExec> will run on GPU
        *Expression <Alias> name#1.middle AS _extract_middle#31 will run on GPU
          *Expression <GetStructField> name#1.middle will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(id#0) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU
      *Exec <BroadcastExchangeExec> will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(contactId#20) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU

24/10/18 17:39:46 WARN GpuOverrides: 
*Exec <BroadcastExchangeExec> will run on GPU
  *Exec <FilterExec> will run on GPU
    *Expression <IsNotNull> isnotnull(contactId#20) will run on GPU
    *Exec <FileSourceScanExec> will run on GPU

24/10/18 17:39:48 WARN GpuOverrides:                                            
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> _extract_middle#31 AS middle#29 will run on GPU
    *Exec <BroadcastHashJoinExec> will run on GPU
      *Exec <ProjectExec> will run on GPU
        *Expression <Alias> name#1.middle AS _extract_middle#31 will run on GPU
          *Expression <GetStructField> name#1.middle will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(id#0) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU

24/10/18 17:39:48 WARN GpuOverrides: 
!Exec <CollectLimitExec> cannot run on GPU because the Exec CollectLimitExec has been disabled, and is disabled by default because Collect Limit replacement can be slower on the GPU, if huge number of rows in a batch it could help by limiting the number of rows transferred from GPU to CPU. Set spark.rapids.sql.exec.CollectLimitExec to true if you wish to enable it
  @Partitioning <SinglePartition$> could run on GPU
  *Exec <ProjectExec> will run on GPU
    *Expression <Alias> _extract_middle#31 AS middle#29 will run on GPU
    *Exec <BroadcastHashJoinExec> will run on GPU
      *Exec <ProjectExec> will run on GPU
        *Expression <Alias> name#1.middle AS _extract_middle#31 will run on GPU
          *Expression <GetStructField> name#1.middle will run on GPU
        *Exec <FilterExec> will run on GPU
          *Expression <IsNotNull> isnotnull(id#0) will run on GPU
          *Exec <FileSourceScanExec> will run on GPU

24/10/18 17:39:49 ERROR Executor: Exception in task 1.0 in stage 2.0 (TID 4)    
java.io.IOException: Error when processing path: file:///home/fejiang/Desktop/contacts/p=2/part-00000-000fbc57-9d4a-4d07-a5fe-1c8c0815d1f8-c000.snappy.parquet, range: 0-991, partition values: [empty row]
	at com.nvidia.spark.rapids.ParquetTableReader.$anonfun$next$1(GpuParquetScan.scala:2709)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.ParquetTableReader.next(GpuParquetScan.scala:2696)
	at com.nvidia.spark.rapids.ParquetTableReader.next(GpuParquetScan.scala:2668)
	at com.nvidia.spark.rapids.CachedGpuBatchIterator$.$anonfun$apply$1(GpuDataProducer.scala:159)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.CachedGpuBatchIterator$.apply(GpuDataProducer.scala:156)
	at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.$anonfun$readBatch$4(GpuMultiFileReader.scala:1066)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$AutoCloseableAttemptSpliterator.next(RmmRapidsRetryIterator.scala:477)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryIterator.next(RmmRapidsRetryIterator.scala:613)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$RmmRapidsRetryAutoCloseableIterator.next(RmmRapidsRetryIterator.scala:517)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.drainSingleWithVerification(RmmRapidsRetryIterator.scala:291)
	at com.nvidia.spark.rapids.RmmRapidsRetryIterator$.withRetryNoSplit(RmmRapidsRetryIterator.scala:132)
	at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.$anonfun$readBatch$1(GpuMultiFileReader.scala:1059)
	at com.nvidia.spark.rapids.Arm$.withResource(Arm.scala:30)
	at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.readBatch(GpuMultiFileReader.scala:1032)
	at com.nvidia.spark.rapids.MultiFileCoalescingPartitionReaderBase.next(GpuMultiFileReader.scala:1012)