fingltd/4mc

null pointer exception

Opened this issue · 2 comments

Not sure if this is the right place to report such issues. I just installed the hadoop-4mc-2.1.0 to my Hadoop 2.7.7 cluster, but get a null pointer exception when it is apparently trying to decompress the input for a streaming job. The same job with the uncompressed input foo.txt runs fine.

foo.txt.4mc.gz

packageJobJar: [/tmp/hadoop-unjar2959120489988360905/] [] /tmp/streamjob6915837012525200457.jar tmpDir=null
19/08/20 17:46:36 INFO impl.TimelineClientImpl: Timeline service address: ...
19/08/20 17:46:36 INFO client.RMProxy: Connecting to ResourceManager at ...
19/08/20 17:46:36 INFO client.AHSProxy: Connecting to Application History server at ...
19/08/20 17:46:36 INFO impl.TimelineClientImpl: Timeline service address: ...
19/08/20 17:46:36 INFO client.RMProxy: Connecting to ResourceManager at ...
19/08/20 17:46:36 INFO client.AHSProxy: Connecting to Application History server at ...
19/08/20 17:46:37 INFO fourmc.FourMcNativeCodeLoader: hadoop-4mc: loaded native library (embedded)
19/08/20 17:46:37 INFO fourmc.Lz4Codec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO fourmc.ZstdCodec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO fourmc.Lz4HighCodec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO fourmc.Lz4MediumCodec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO fourmc.Lz4UltraCodec: Successfully loaded & initialized native-4mc library
19/08/20 17:46:37 INFO mapred.FileInputFormat: Total input paths to process : 1
19/08/20 17:46:37 INFO mapreduce.JobSubmitter: number of splits:1
19/08/20 17:46:37 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1565620832498_0009
19/08/20 17:46:38 INFO impl.YarnClientImpl: Submitted application application_1565620832498_0009
19/08/20 17:46:38 INFO mapreduce.Job: The url to track the job: ...
19/08/20 17:46:38 INFO mapreduce.Job: Running job: job_1565620832498_0009
19/08/20 17:46:44 INFO mapreduce.Job: Job job_1565620832498_0009 running in uber mode : false
19/08/20 17:46:44 INFO mapreduce.Job:  map 0% reduce 0%
19/08/20 17:47:03 INFO mapreduce.Job:  map 67% reduce 0%
19/08/20 17:47:04 INFO mapreduce.Job: Task Id : attempt_1565620832498_0009_m_000000_0, Status : FAILED
Error: java.lang.NullPointerException
	at com.hadoop.compression.fourmc.Lz4Decompressor.reset(Lz4Decompressor.java:234)
	at org.apache.hadoop.io.compress.CodecPool.returnDecompressor(CodecPool.java:230)
	at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:288)
	at org.apache.hadoop.mapred.MapTask$TrackedRecordReader.close(MapTask.java:210)
	at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:1979)
	at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:468)
	at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
	at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:164)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1762)
	at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

same problem

This is a problem on FourMcInputStream.close() method calling LZ4Decompressor.releaseDirectBuffers. when reset gets called the buffers have been previously set to null and therefore it causes the NullPointerException. Commenting this out on FourMcInputStream.close() method fix this issue.
//((Lz4Decompressor)decompressor).releaseDirectBuffers();
I dont now the reason why this is getting called at this. I looked at a similar project hadoop-lzo from twitter and it does not releaseDirectBuffers in its inputstream.close() method. It is very destructive to make this call here since the decompressor will later be return to the hadoop CodecPool and it will be a bad decompressor in the pool since it is internal buffers have been destroyed.

The following scala code can be used to reproduce this issue, using spark 2.4.4-scala-2.11 and hadoop 2.7.0.
This will fail here val raw_df = spark.read.option("header", value = true).csv("file:///c:/lift/V1_sep_test_900.psv.4mc") when it uses the inputstream to do a partial read to create the dataframe and its schema, then it was supposed to return the decompressor back to the pool and later reused on this line raw_df.show(5) but it fails after calling calling reset on the decompressor after its directbuffers were released in the close method.

object Decompressor {
def createSparkSession(): SparkSession = {
System.setProperty("hadoop.home.dir", "C:\opt\mapr\hadoop\hadoop-2.7.0")
System.setProperty("SPARK_HOME", "C:\opt\mapr\spark\spark-2.4.4")
return SparkSession.builder()
.master("local[1]")
.appName("Decompressor - 4mc debug")
.config("spark.some.config.option", "config-value")
.getOrCreate()
}

def main(args: Array[String]): Unit = {
val spark = createSparkSession()
val raw_df = spark.read.option("header", value = true)
.csv("file:///c:/lift/V1_sep_test_900.psv.4mc")
raw_df.show(5)
}
}