twitter/hadoop-lzo

Get corrent result processing files, but errors processing directories?

Closed this issue · 8 comments

I use spark to process lzo files. With a.lzo and b.lzo in "/input/", eg.
When processing seperately with filename as "/input/a.lzo" and "/input/b.lzo", it can get corrent result.
When processing together with filename as "/input" or "/input/", get errors like this:

14/09/05 13:01:32 WARN LzopInputStream: IOException in getCompressedData; likely LZO corruption.
java.io.IOException: Corrupted uncompressed block
at com.hadoop.compression.lzo.LzopInputStream.verifyChecksums(LzopInputStream.java:219)
at com.hadoop.compression.lzo.LzopInputStream.getCompressedData(LzopInputStream.java:284)
at com.hadoop.compression.lzo.LzopInputStream.decompress(LzopInputStream.java:261)
at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:85)
at java.io.InputStream.read(InputStream.java:101)
at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:211)
at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:206)
at org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:45)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:201)
at org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:184)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$1.hasNext(Iterator.scala:847)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:133)
at org.apache.spark.sql.execution.Aggregate$$anonfun$execute$1$$anonfun$6.apply(Aggregate.scala:130)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:571)
at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:571)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
14/09/05 13:01:32 WARN LzopInputStream: Incorrect LZO file format: file did not end with four trailing zeroes.
java.io.IOException: Corrupted uncompressed block
at com.hadoop.compression.lzo.LzopInputStream.verifyChecksums(LzopInputStream.java:219)
at com.hadoop.compression.lzo.LzopInputStream.close(LzopInputStream.java:342)
at org.apache.hadoop.util.LineReader.close(LineReader.java:150)
at org.apache.hadoop.mapred.LineRecordReader.close(LineRecordReader.java:241)
at org.apache.spark.rdd.HadoopRDD$$anon$1.close(HadoopRDD.scala:211)
at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
at org.apache.spark.rdd.HadoopRDD$$anon$1$$anonfun$1.apply$mcV$sp(HadoopRDD.scala:196)
at org.apache.spark.TaskContext$$anonfun$executeOnCompleteCallbacks$1.apply(TaskContext.scala:63)
at org.apache.spark.TaskContext$$anonfun$executeOnCompleteCallbacks$1.apply(TaskContext.scala:63)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.TaskContext.executeOnCompleteCallbacks(TaskContext.scala:63)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:204)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.Task.run(Task.scala:51)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Anyone can help?

Do you have files (possibly hidden files) in that directory that are not
Lzo?

No. In consideration of hidden files, I create a new dir and put just two lzo files. It didn't work either.

I debugged the code, and found, error occurs when using local[n] as running with multithread, but ok when using local[1] with just one thread.
The code in LzopDecompressor: 46
chkDMap.put(flag, flag.getChecksumClass().newInstance());
I think the new instance of ChecksumClass(actually Adler32) was affected by something related to multithread. This is all I know now.

Thanks for the report @JunfeiHu. Would you provide a patch for thread safety in this case?

@JunfeiHu I put some info about what's going on in #94

I hava meet this problem too. how to solve it.

Hey @suiwenfeng, you could apply my patch in #94, or ensure you only use "local[1]" as your spark master url.

Thx, i will try this solution.