Error while attempting to use Quality Scoring for SlimPajama Book dataset
Closed this issue · 4 comments
Hi team,
I am trying to run Quality scoring through the code snippet here -
pipeline = TextPipeline()
ops = [
ParquetReader("/localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book"),
TextQualityScorer(),
ParquetWriter("/localdisk/akakne/quality_benchmarking_before_vs_after/results/raw_data/Book")
]
pipeline.add_operations(ops)
ret = pipeline.execute()
del pipeline
But, I am getting an error (I believe it's during writing of output files). Full error is here -
init spark
Will assign 48 cores and 308265 M memory for spark
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/07 14:59:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
per core memory size is 6.272 GB and shuffle_disk maximum capacity is 8589934592.000 GB
execute with spark started ...
DatasetReader
ParquetReader
TextQualityScorer
model_name is gpt3
�[32m2023-11-07 14:59:38.693�[0m | �[1mINFO �[0m | �[36mpyrecdp.primitives.operations.text_qualityscorer�[0m:�[36mprepare_model�[0m:�[36m122�[0m - �[1mPreparing scorer model in [/home/akakne/.cache/recdp/models/gpt3_quality_model]...�[0m
real_model_path is /home/akakne/.cache/recdp/models/gpt3_quality_model
�[32m2023-11-07 14:59:41.506�[0m | �[1mINFO �[0m | �[36mpyrecdp.primitives.operations.text_qualityscorer�[0m:�[36mpredict�[0m:�[36m252�[0m - �[1mStart scoring dataset...�[0m
ParquetWriter
23/11/07 15:22:45 ERROR PythonUDFRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/akakne/anaconda3/envs/near_dedup/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 810, in main
eval_type = read_int(infile)
File "/home/akakne/anaconda3/envs/near_dedup/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 596, in read_int
raise EOFError
EOFError
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:94)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:75)
at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1535)
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
... 21 more
23/11/07 15:22:45 ERROR PythonUDFRunner: This may have been caused by a prior exception:
org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
... 21 more
23/11/07 15:22:45 ERROR Executor: Exception in task 247.0 in stage 11.0 (TID 258)
org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
... 21 more
23/11/07 15:22:45 ERROR TaskSetManager: Task 247 in stage 11.0 failed 1 times; aborting job
23/11/07 15:22:45 ERROR FileFormatWriter: Aborting job 50e4308e-462b-42e6-bb88-c6fa5491fbd3.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 247 in stage 11.0 failed 1 times, most recent failure: Lost task 247.0 in stage 11.0 (TID 258) (aia-opa-clx-4016.jf.intel.com executor driver): org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
... 21 more
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:789)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
at org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
... 21 more
[Stage 11:====================================================> (448 + 9) / 458]
execute with spark took 1390.6338619077578 sec
[Stage 11:====================================================> (448 + 2) / 458]
@haojinIntel , please help to look into this issue
@adkakne Hi, adkakne. Please note that JsonlReader() and ParquetReader() will need to be passed the absolute path of the file.
I've using the dataset book_sample.jsonl of RedPajama-Data-1T-Sample to validate the code. The test code is like:
pipeline = TextPipeline()
ops = [
JsonlReader("file:///home/jh/test/book_sample.jsonl"),
TextQualityScorer(),
ParquetWriter("file:///home/jh/test/jh")
]
pipeline.add_operations(ops)
pipeline.execute()
Hi @haojinIntel,
thank you for looking into this. I used absolute paths and reproduced the error again today. I think the error lies in resource management during writing of the output. Thank you for support.
{
"name": "Py4JJavaError",
"message": "An error occurred while calling o406.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 247 in stage 11.0 failed 1 times, most recent failure: Lost task 247.0 in stage 11.0 (TID 258) (aia-opa-clx-4016.jf.intel.com executor driver): org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
\tat org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
\tat scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator.foreach(Iterator.scala:943)
\tat scala.collection.Iterator.foreach$(Iterator.scala:943)
\tat scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
\tat org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
\tat org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
\tat org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
\tat org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
\tat org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
\tat org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
\t... 21 more
Driver stacktrace:
\tat org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
\tat scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
\tat scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
\tat scala.Option.foreach(Option.scala:407)
\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
\tat org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
\tat org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
\tat org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
\tat org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
\tat org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
\tat org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
\tat org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
\tat org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
\tat org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
\tat org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
\tat org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
\tat org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
\tat org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
\tat org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:789)
\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
\tat java.lang.reflect.Method.invoke(Method.java:498)
\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
\tat py4j.Gateway.invoke(Gateway.java:282)
\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
\tat py4j.commands.CallCommand.execute(CallCommand.java:79)
\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)
\tat java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
\tat org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
\tat scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator.foreach(Iterator.scala:943)
\tat scala.collection.Iterator.foreach$(Iterator.scala:943)
\tat scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
\tat org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
\tat org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
\tat org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
\tat org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
\tat org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
\tat org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
\t... 21 more
",
"stack": "---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb Cell 14 line 2
<a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=0'>1</a> # 3.3. Book - raw data
----> <a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=1'>2</a> quality_scoring([book_raw_data], \"raw_data\", results_dir)
/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb Cell 14 line 2
<a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=14'>15</a> ops = [
<a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=15'>16</a> ParquetReader(ds_path),
<a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=16'>17</a> TextQualityScorer(),
<a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=17'>18</a> ParquetWriter(result_path)
<a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=18'>19</a> ]
<a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=19'>20</a> pipeline.add_operations(ops)
---> <a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=20'>21</a> ret = pipeline.execute()
<a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=21'>22</a> del pipeline
File ~/anaconda3/envs/near_dedup/lib/python3.8/site-packages/pyrecdp/LLM/TextPipeline.py:109, in TextPipeline.execute(self, ds)
107 op.cache = ds
108 else:
--> 109 op.execute_spark(executable_pipeline, self.rdp)
110 if len(executable_sequence) > 0:
111 ds = executable_sequence[-1].cache
File ~/anaconda3/envs/near_dedup/lib/python3.8/site-packages/pyrecdp/primitives/operations/base.py:218, in BaseLLMOperation.execute_spark(self, pipeline, rdp, child_ds)
216 child_output.append(pipeline[op].cache)
217 print(self)
--> 218 self.cache = self.process_spark(rdp.spark, *child_output)
219 return self.cache
File ~/anaconda3/envs/near_dedup/lib/python3.8/site-packages/pyrecdp/primitives/operations/text_writer.py:22, in ParquetWriter.process_spark(self, spark, spark_df)
21 def process_spark(self, spark, spark_df: DataFrame = None) -> DataFrame:
---> 22 spark_df.write.parquet(self.output_dir, mode='overwrite')
23 return spark_df
File ~/anaconda3/envs/near_dedup/lib/python3.8/site-packages/pyspark/sql/readwriter.py:1656, in DataFrameWriter.parquet(self, path, mode, partitionBy, compression)
1654 self.partitionBy(partitionBy)
1655 self._set_opts(compression=compression)
-> 1656 self._jwrite.parquet(path)
File ~/anaconda3/envs/near_dedup/lib/python3.8/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\\
1317 self.command_header +\\
1318 args_command +\\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, \"_detach\"):
File ~/anaconda3/envs/near_dedup/lib/python3.8/site-packages/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw)
167 def deco(*a: Any, **kw: Any) -> Any:
168 try:
--> 169 return f(*a, **kw)
170 except Py4JJavaError as e:
171 converted = convert_exception(e.java_exception)
File ~/anaconda3/envs/near_dedup/lib/python3.8/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 \"An error occurred while calling {0}{1}{2}.\
\".
328 format(target_id, \".\", name), value)
329 else:
330 raise Py4JError(
331 \"An error occurred while calling {0}{1}{2}. Trace:\
{3}\
\".
332 format(target_id, \".\", name, value))
Py4JJavaError: An error occurred while calling o406.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 247 in stage 11.0 failed 1 times, most recent failure: Lost task 247.0 in stage 11.0 (TID 258) (aia-opa-clx-4016.jf.intel.com executor driver): org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
\tat org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
\tat scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator.foreach(Iterator.scala:943)
\tat scala.collection.Iterator.foreach$(Iterator.scala:943)
\tat scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
\tat org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
\tat org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
\tat org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
\tat org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
\tat org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
\tat org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
\t... 21 more
Driver stacktrace:
\tat org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
\tat scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
\tat scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
\tat scala.Option.foreach(Option.scala:407)
\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
\tat org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
\tat org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
\tat org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
\tat org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
\tat org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
\tat org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
\tat org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
\tat org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
\tat org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
\tat org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
\tat org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
\tat org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
\tat org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
\tat org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:789)
\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
\tat java.lang.reflect.Method.invoke(Method.java:498)
\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
\tat py4j.Gateway.invoke(Gateway.java:282)
\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
\tat py4j.commands.CallCommand.execute(CallCommand.java:79)
\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)
\tat java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
\tat org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
\tat scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator.foreach(Iterator.scala:943)
\tat scala.collection.Iterator.foreach$(Iterator.scala:943)
\tat scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
\tat org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
\tat org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
\tat org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
\tat org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
\tat org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
\tat org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
\t... 21 more
"
}
Hi Haojin,
I am using the whole Book dataset from SlimPajama. It is possible that the error is coming during writing and due to OOM error. Thank you.