intel/e2eAIOK

Error while attempting to use Quality Scoring for SlimPajama Book dataset

Closed this issue · 4 comments

Hi team,

I am trying to run Quality scoring through the code snippet here -

pipeline = TextPipeline()
ops = [
    ParquetReader("/localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book"),
    TextQualityScorer(),
    ParquetWriter("/localdisk/akakne/quality_benchmarking_before_vs_after/results/raw_data/Book")
]
pipeline.add_operations(ops)
ret = pipeline.execute()
del pipeline

But, I am getting an error (I believe it's during writing of output files). Full error is here -

init spark
Will assign 48 cores and 308265 M memory for spark
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/07 14:59:33 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
per core memory size is 6.272 GB and shuffle_disk maximum capacity is 8589934592.000 GB
execute with spark started ...
DatasetReader
ParquetReader
                                                                                
TextQualityScorer
model_name is gpt3
�[32m2023-11-07 14:59:38.693�[0m | �[1mINFO    �[0m | �[36mpyrecdp.primitives.operations.text_qualityscorer�[0m:�[36mprepare_model�[0m:�[36m122�[0m - �[1mPreparing scorer model in [/home/akakne/.cache/recdp/models/gpt3_quality_model]...�[0m
real_model_path is /home/akakne/.cache/recdp/models/gpt3_quality_model
                                                                                
�[32m2023-11-07 14:59:41.506�[0m | �[1mINFO    �[0m | �[36mpyrecdp.primitives.operations.text_qualityscorer�[0m:�[36mpredict�[0m:�[36m252�[0m - �[1mStart scoring dataset...�[0m
ParquetWriter
23/11/07 15:22:45 ERROR PythonUDFRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/akakne/anaconda3/envs/near_dedup/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 810, in main
    eval_type = read_int(infile)
  File "/home/akakne/anaconda3/envs/near_dedup/lib/python3.8/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 596, in read_int
    raise EOFError
EOFError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.columnar.DefaultCachedBatchSerializer$$anon$1.hasNext(InMemoryRelation.scala:118)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:302)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1535)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1462)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1526)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1349)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:375)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:326)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
	at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
	... 21 more
23/11/07 15:22:45 ERROR PythonUDFRunner: This may have been caused by a prior exception:
org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
	at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
	... 21 more
23/11/07 15:22:45 ERROR Executor: Exception in task 247.0 in stage 11.0 (TID 258)
org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
	at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
	... 21 more
23/11/07 15:22:45 ERROR TaskSetManager: Task 247 in stage 11.0 failed 1 times; aborting job
23/11/07 15:22:45 ERROR FileFormatWriter: Aborting job 50e4308e-462b-42e6-bb88-c6fa5491fbd3.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 247 in stage 11.0 failed 1 times, most recent failure: Lost task 247.0 in stage 11.0 (TID 258) (aia-opa-clx-4016.jf.intel.com executor driver): org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
	at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
	... 21 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:789)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
	at org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
	at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
	at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
	at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
	at org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
	at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
	... 21 more
[Stage 11:====================================================> (448 + 9) / 458]
execute with spark took 1390.6338619077578 sec
[Stage 11:====================================================> (448 + 2) / 458]

@haojinIntel , please help to look into this issue

@adkakne Hi, adkakne. Please note that JsonlReader() and ParquetReader() will need to be passed the absolute path of the file.
I've using the dataset book_sample.jsonl of RedPajama-Data-1T-Sample to validate the code. The test code is like:

pipeline = TextPipeline()
ops = [
    JsonlReader("file:///home/jh/test/book_sample.jsonl"),
    TextQualityScorer(),
    ParquetWriter("file:///home/jh/test/jh")
]
pipeline.add_operations(ops)
pipeline.execute()

image

Hi @haojinIntel,
thank you for looking into this. I used absolute paths and reproduced the error again today. I think the error lies in resource management during writing of the output. Thank you for support.

{
	"name": "Py4JJavaError",
	"message": "An error occurred while calling o406.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 247 in stage 11.0 failed 1 times, most recent failure: Lost task 247.0 in stage 11.0 (TID 258) (aia-opa-clx-4016.jf.intel.com executor driver): org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
\tat org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
\tat scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator.foreach(Iterator.scala:943)
\tat scala.collection.Iterator.foreach$(Iterator.scala:943)
\tat scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
\tat org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
\tat org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
\tat org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
\tat org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
\tat org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
\tat org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
\t... 21 more

Driver stacktrace:
\tat org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
\tat scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
\tat scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
\tat scala.Option.foreach(Option.scala:407)
\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
\tat org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
\tat org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
\tat org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
\tat org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
\tat org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
\tat org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
\tat org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
\tat org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
\tat org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
\tat org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
\tat org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
\tat org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
\tat org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
\tat org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:789)
\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
\tat java.lang.reflect.Method.invoke(Method.java:498)
\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
\tat py4j.Gateway.invoke(Gateway.java:282)
\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
\tat py4j.commands.CallCommand.execute(CallCommand.java:79)
\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)
\tat java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
\tat org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
\tat scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator.foreach(Iterator.scala:943)
\tat scala.collection.Iterator.foreach$(Iterator.scala:943)
\tat scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
\tat org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
\tat org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
\tat org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
\tat org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
\tat org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
\tat org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
\t... 21 more
",
	"stack": "---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb Cell 14 line 2
      <a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=0'>1</a> # 3.3. Book - raw data
----> <a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=1'>2</a> quality_scoring([book_raw_data], \"raw_data\", results_dir)

/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb Cell 14 line 2
     <a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=14'>15</a> ops = [
     <a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=15'>16</a>     ParquetReader(ds_path),
     <a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=16'>17</a>     TextQualityScorer(),
     <a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=17'>18</a>     ParquetWriter(result_path)
     <a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=18'>19</a> ]
     <a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=19'>20</a> pipeline.add_operations(ops)
---> <a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=20'>21</a> ret = pipeline.execute()
     <a href='vscode-notebook-cell://ssh-remote%2Baia-opa-clx-4016.jf.intel.com/localdisk/akakne/quality_benchmarking_before_vs_after/measure_quality.ipynb#X33sdnNjb2RlLXJlbW90ZQ%3D%3D?line=21'>22</a> del pipeline

File ~/anaconda3/envs/near_dedup/lib/python3.8/site-packages/pyrecdp/LLM/TextPipeline.py:109, in TextPipeline.execute(self, ds)
    107         op.cache = ds
    108     else:
--> 109         op.execute_spark(executable_pipeline, self.rdp)
    110 if len(executable_sequence) > 0:
    111     ds = executable_sequence[-1].cache

File ~/anaconda3/envs/near_dedup/lib/python3.8/site-packages/pyrecdp/primitives/operations/base.py:218, in BaseLLMOperation.execute_spark(self, pipeline, rdp, child_ds)
    216     child_output.append(pipeline[op].cache)
    217 print(self)
--> 218 self.cache = self.process_spark(rdp.spark, *child_output)
    219 return self.cache

File ~/anaconda3/envs/near_dedup/lib/python3.8/site-packages/pyrecdp/primitives/operations/text_writer.py:22, in ParquetWriter.process_spark(self, spark, spark_df)
     21 def process_spark(self, spark, spark_df: DataFrame = None) -> DataFrame:
---> 22     spark_df.write.parquet(self.output_dir, mode='overwrite')
     23     return spark_df

File ~/anaconda3/envs/near_dedup/lib/python3.8/site-packages/pyspark/sql/readwriter.py:1656, in DataFrameWriter.parquet(self, path, mode, partitionBy, compression)
   1654     self.partitionBy(partitionBy)
   1655 self._set_opts(compression=compression)
-> 1656 self._jwrite.parquet(path)

File ~/anaconda3/envs/near_dedup/lib/python3.8/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\\
   1317     self.command_header +\\
   1318     args_command +\\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, \"_detach\"):

File ~/anaconda3/envs/near_dedup/lib/python3.8/site-packages/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw)
    167 def deco(*a: Any, **kw: Any) -> Any:
    168     try:
--> 169         return f(*a, **kw)
    170     except Py4JJavaError as e:
    171         converted = convert_exception(e.java_exception)

File ~/anaconda3/envs/near_dedup/lib/python3.8/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         \"An error occurred while calling {0}{1}{2}.\
\".
    328         format(target_id, \".\", name), value)
    329 else:
    330     raise Py4JError(
    331         \"An error occurred while calling {0}{1}{2}. Trace:\
{3}\
\".
    332         format(target_id, \".\", name, value))

Py4JJavaError: An error occurred while calling o406.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 247 in stage 11.0 failed 1 times, most recent failure: Lost task 247.0 in stage 11.0 (TID 258) (aia-opa-clx-4016.jf.intel.com executor driver): org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
\tat org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
\tat scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator.foreach(Iterator.scala:943)
\tat scala.collection.Iterator.foreach$(Iterator.scala:943)
\tat scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
\tat org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
\tat org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
\tat org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
\tat org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
\tat org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
\tat org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
\t... 21 more

Driver stacktrace:
\tat org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
\tat scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
\tat scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
\tat org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
\tat scala.Option.foreach(Option.scala:407)
\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
\tat org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
\tat org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
\tat org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
\tat org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
\tat org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
\tat org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
\tat org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
\tat org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
\tat org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
\tat org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
\tat org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
\tat org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
\tat org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
\tat org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
\tat org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
\tat org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
\tat org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
\tat org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
\tat org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
\tat org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
\tat org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
\tat org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
\tat org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
\tat org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:789)
\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
\tat java.lang.reflect.Method.invoke(Method.java:498)
\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
\tat py4j.Gateway.invoke(Gateway.java:282)
\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
\tat py4j.commands.CallCommand.execute(CallCommand.java:79)
\tat py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
\tat py4j.ClientServerConnection.run(ClientServerConnection.java:106)
\tat java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered error while reading file file:///localdisk/akakne/quality_benchmarking_before_vs_after/data/raw_data/Book/train_chunk5.jsonl.id_hash.parquet. Details:
\tat org.apache.spark.sql.errors.QueryExecutionErrors$.cannotReadFilesError(QueryExecutionErrors.scala:877)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:304)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:594)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
\tat org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat org.apache.spark.ContextAwareIterator.hasNext(ContextAwareIterator.scala:39)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1211)
\tat scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1217)
\tat scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
\tat scala.collection.Iterator.foreach(Iterator.scala:943)
\tat scala.collection.Iterator.foreach$(Iterator.scala:943)
\tat scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
\tat org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:320)
\tat org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.writeIteratorToStream(PythonUDFRunner.scala:57)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:440)
\tat org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:2088)
\tat org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:274)
Caused by: java.lang.RuntimeException: Cannot reserve additional contiguous bytes in the vectorized reader (integer overflow). As a workaround, you can reduce the vectorized reader batch size, or disable the vectorized reader, or disable spark.sql.sources.bucketing.enabled if you read from bucket table. For Parquet file format, refer to spark.sql.parquet.columnarReaderBatchSize (default 4096) and spark.sql.parquet.enableVectorizedReader; for ORC file format, refer to spark.sql.orc.columnarReaderBatchSize (default 4096) and spark.sql.orc.enableVectorizedReader.
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.throwUnsupportedException(WritableColumnVector.java:123)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.reserve(WritableColumnVector.java:96)
\tat org.apache.spark.sql.execution.vectorized.WritableColumnVector.appendBytes(WritableColumnVector.java:539)
\tat org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.putByteArray(OnHeapColumnVector.java:526)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedPlainValuesReader.readBinary(VectorizedPlainValuesReader.java:366)
\tat org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory$BinaryUpdater.readValues(ParquetVectorUpdaterFactory.java:727)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatchInternal(VectorizedRleValuesReader.java:244)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedRleValuesReader.readBatch(VectorizedRleValuesReader.java:176)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:252)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:328)
\tat org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:219)
\tat org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:125)
\tat org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:294)
\t... 21 more
"
}

Hi Haojin,

I am using the whole Book dataset from SlimPajama. It is possible that the error is coming during writing and due to OOM error. Thank you.