eapache/go-xerial-snappy

maybe miss xerial framing format writer or encode

cocotyty opened this issue · 0 comments

this library is used by sarama.
kafka happens some exceptions when produce messageSet size is big.

ERROR [Replica Manager on Broker 5]: Error processing append operation on partition xxx (kafka.server.ReplicaManager)
kafka.common.KafkaException :
at kafka.message.ByteBufferMessageSet$$anon$1makeNext(ByteBufferMessageSet.scala :159)
at kafka.message.ByteBufferMessageSet$$ano$1makeNext(ByteBufferMessage scala :85)
at kafka.utils.IteratorTemplate.maybeComputeNext(Iteratoremte scala :64)
at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate scala :56)
at kafka.message.ByteBufferMessageSet$$anon$2.mNextutrByeBufferessagee scala :357)
kafka.message.ByteBufferMessageSet$$anon$ makeNext(ByteBufferMessageSet.scala :369)
at
at kafka.message.ByteBufferMessageSet$$aon$ makeNext(ByteBufferMessageet.scala :324)
at kafka.utils.IteratorTemplate.maybeComputeNext(Iteratorempae scala :64)
at kafka.utils.IteratorTempLate.hasNext(IteratorTemplate scala :56)
at scala.collection.Iterator$class.foreach(Iterator.scala :893)
at kafka.utils.IteratorTemplate.foreach(IteratorTemplate scala :30)
at kafka.message.ByteBufferMessageSet.validateMessageAdssign0ffsetsyeBuffrMessageSet.scala :427)
at kafka.log.Log.liftedTree1$1(Log.scala: 339)
at kafka.Log Log.append(Log.Scala :338)
t kafka.cluster.Partition$$anonfun$11apply(Partition.scala :443)
at kafka.cluster.Partition$$anonfun$11apply(Partition.scala :429)
at kafka.utils.CoreUtils$.inLockCCoreUtils.scala :231)
at kafka.utils.CoreUtils$.inReadLockCCoreUtils.scala :237)
at kafka.cluster.Partition.appendMessagesToLeaderPartition scala :429)
at kafka.server.ReplicaManager$$anonfun$appendoLocalLog$ apply(ReplicaManager.scala :406)
at kafka.server.ReplicaManager$$anonfun$appendoLoca Log$2.apply(ReplicaManager.scala :392)
at scala.collection.Traversablelike$$anonfun$map$ apply(Traversablelike.scala :234)
at scala.collection.Traversablelike$$anonfun$mp$1.appyraversabelike scala :234)
at scala.collection.mutable.HashMap$$anonfun$foreach$1apply(HashMap.scala :99)
at scala.collection.mutable.HashMap$$anonfun$foreach$1apply(HashMap.scala :99)
at scala.collection.mutable.HashTabLe$class.foreachEntry(HashTable.scala :230)
t scala.collection.mutable.HashMap.foreachEntryCHashMap.scala :40)
at scala.collection.mutable HashMap.foreach(HashMap.scala :99)
at scala.collection.Traversablelike$class.map(Traversablelike.scala :234)
st scala.collection.AbstractTraversable.map(Traversable.scala :104)
at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager scala :392)
t kafka.server.ReplicaManager.appendMessages(ReplicManager scala :328)
at kafka.server.KafkaApis.handLeProducerRequestKafkaAis.scala :405)
at kafka.server.KafkaApis.handLe(KafkaApis.scala :76)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala :60)
t java.lang.Thread.run(Thread.java: 745)
Caused by: java.io.IOException: failed to read chunk
at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream java :433)
at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java :167)
at java.io.DataInputStream.readFully(DataInputStream.java :195)
at java.io.DataInputStream.readLong(DataInputStream.java :416)
at kafka.message.ByteBufferMessageSet$$$ readMessageFromStreamByeBufferssagee scala :118)
at kafka.message.ByteBufferMessageSet$$anon$1makeNext(ByteBufferMessageSet scala :153)

I read org.xerial.snappy.SnappyInputStream codes.
In fact, if the decompression of the data is relatively small, it has done a compatibility deal, if the data is larger, will still be strictly in accordance with the xerial framing format

https://github.com/xerial/snappy-java/blob/aed3843f8efa1e82b7509ea131c09220a039bfd1/src/main/java/org/xerial/snappy/SnappyInputStream.java#L99

So i forked this project, write a writer implements the same future as SnappyInputStream.
https://github.com/cocotyty/go-xerial-snappy

should i create a pull request ?
sorry for my poor english .
thank you for your greate project.