header challenge: header with multivalued columns raises "java.lang.ArithmeticException: / by zero" during df.select(any_column).show() or df.select(any_column).take()
Closed this issue · 5 comments
Hi guys just wanted to give some more feedback about my favourite spark package!
I encounter an error reading a fits file with an "exotic header". I assume the issue is due to the columns which contain data arrays. I would expect spark-fits to load multivalued columns as vectors but I think it might be causing bigger problems as I cannot view any columns.
For example when I read the data:
path = 'photoObj-001000-1-0027.fits'
df = sqlc.read.format("fits").option("hdu", 1).load(path)
The following error is thrown when calling:
df.select('OBJID').show()
The header is shown here example.txt and the file itself is zipped here photoObj-001000-1-0027.fits.zip
Before the error the schema is inferred:
Despite this the multivalued columns (e.g. code 5E with shape 5, such as 'MODELMAG') are teated as floats. I would expect them to be them to be treated as vectors. Is this possible?
The error itself occurs after selecting any column (even if it is a regular non-multivalue column) and then applying the .take(n) or .show(n) method:
com.github.astrolabsoftware#spark-fits_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3714d087-ba08-4b46-bb49-9693f86131bb;1.0
confs: [default]
found com.github.astrolabsoftware#spark-fits_2.11;0.7.3 in central
:: resolution report :: resolve 183ms :: artifacts dl 3ms
:: modules in use:
com.github.astrolabsoftware#spark-fits_2.11;0.7.3 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 1 | 0 | 0 | 0 || 1 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-3714d087-ba08-4b46-bb49-9693f86131bb
confs: [default]
0 artifacts copied, 1 already retrieved (0kB/5ms)
2019-05-06 19:07:42 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2019-05-06 19:08:28 WARN Utils:66 - Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
[Stage 0:> (0 + 1) / 1]2019-05-06 19:08:30 ERROR Executor:91 - Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.ArithmeticException: / by zero
at com.astrolabsoftware.sparkfits.FitsRecordReader.nextKeyValue(FitsRecordReader.scala:318)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-05-06 19:08:30 WARN TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.ArithmeticException: / by zero
at com.astrolabsoftware.sparkfits.FitsRecordReader.nextKeyValue(FitsRecordReader.scala:318)
at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Please let me know if you require any additional information or have any questions,
Cheers,
Jacob
Many thanks @jacobic for reporting the issue, and providing a detailed explanation!
Indeed, spark-fits handles only scalar entries, and there is not yet support for column elements being arrays :-(
I will have a closer look by the end of the week and try to provide a fix quickly, though I suspect it will require some dev.
Hi @jacobic,
A bit more on this - the error you see is actually not related to the multivalued columns. This error is thrown when the recordLength
parameter (default is 1 KB) is lower than the actual size of one line of the FITS table (3 KB here). The recordlength
option controls how the data is split and read inside each HDFS block (or more precisely inside each InputSplit as they are not the same) by individual mappers for processing. 1 KB seemed to give good performance (and is large enough for most of the FITS I was using), and for larger value you might suffer from a longer garbage collector time.
Hence, if you do:
# Use 5KB for recordLength
df = spark.read.format("fits")\
.option("hdu", 1)\
.option("recordlength", 5*1024)\
.load("path/to/photoObj-001000-1-0027.fits")
df.show(2)
It does not crash anymore. But that does not mean it works correctly as it takes only the first value in each of the multivalued columns. I am working on a fix for this.
Hi @jacobic
By one line, do you mean a single row? If so, does this mean that the large recordlength is due to a large number of columns or the data size (in bytes) for each element in a column?
Yes by one line, I mean a single row whose size is given by the number of columns and the type of objects for each column.
I assume it would not be related to the number of columns because things were lazily evaluated?
When you call show
or take
, things are no more lazy, and data has to be read. The recordLength
is a very low-level object, inherited from the Hadoop I/O classes that Spark is using. The record reader is just loading a chunk of data and decoding it on-the-fly. By default, the record reader decode row-by-row, and it forces the recordLength
to be at the very least the size of one row. If not, it fails:
// in FitsRecordReader.scala
// Convert each row
// 1 task: 32 MB @ 2s
val tmp = Seq.newBuilder[Row]
for (i <- 0 to recordLength / rowSizeLong.toInt - 1) {
tmp += Row.fromSeq(fits.getRow(
recordValueBytes.slice(
rowSizeInt*i, rowSizeInt*(i+1))))
}
recordValue = tmp.result
One could be more clever and specify only specific columns to be decoded (parquet does it for example), and this is something that will be added in the future.
In short, how do I find the optimal recordlength for an arbitrary fits file in order to avoid the recordlength exception?
The current value of 1KB has been found completely empirically, through a series of benchmarks and profiling. For the moment, the user needs manually to make sure that the recordLength default option (1 KB) is higher than the row size (from the header).
This is far from optimal, and I will change this in the future such that recordLength is by default either (1 KB if row size is below 1KB) or (the size of the row). The first option is to make sure we process several rows at once if possible (more efficient).
Fixed in #70!