jet/kafunk

MessageTooBigException when consuming a Snappy compressed topic.

Closed this issue · 0 comments

The following is raised:

2017-08-03 12:54:17:5250|ERROR|Kafunk.Consumer|fetch_process_errored|group_id=kafunk_test_0802a generation_id=1 member_id=-b795691e-7684-482c-a1ce-b01bd5b62512 topic=XXXX partition_count=256 error=System.AggregateException: One or more errors occurred. ---> Kafunk.Protocol+MessageTooBigException: partition=0 offset=14053 message_set_size=32768 message_size=44331
   at Kafunk.Protocol.MessageSet.Read(Int16 messageVer, Int32 partition, Int16 ec, Int32 messageSetSize, BinaryZipper buf) in C:\code\kafunk\src\kafunk\Protocol.fs:line 448
   at Kafunk.CompressionModule.SnappyModule.decompress(Int16 messageVer, Message m) in C:\code\kafunk\src\kafunk\Compression.fs:line 154
   at Kafunk.CompressionModule.decompress@167-1.Invoke(MessageSetItem msi) in C:\code\kafunk\src\kafunk\Compression.fs:line 175
   at Microsoft.FSharp.Collections.ArrayModule.Parallel.Collect@706-1.Invoke(Int32 obj)
   at System.Threading.Tasks.Parallel.<>c__DisplayClassf`1.<ForWorker>b__c()
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
   at System.Threading.Tasks.Task.<>c__DisplayClass11.<ExecuteSelfReplicating>b__10(Object param0)
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.Wait()
   at System.Threading.Tasks.Parallel.ForWorker[TLocal](Int32 fromInclusive, Int32 toExclusive, ParallelOptions parallelOptions, Action`1 body, Action`2 bodyWithState, Func`4 bodyWithLocal, Func`1 localInit, Action`1 localFinally)
   at System.Threading.Tasks.Parallel.For(Int32 fromInclusive, Int32 toExclusive, Action`1 body)
   at Microsoft.FSharp.Collections.ArrayModule.Parallel.Collect[T,TResult](FSharpFunc`2 mapping, T[] array)
   at Kafunk.CompressionModule.decompress(Int16 messageVer, MessageSet ms) in C:\code\kafunk\src\kafunk\Compression.fs:line 166
   at Kafunk.ConsumerModule.tryFetch@616-5.Invoke(Tuple`5 tupledArg) in C:\code\kafunk\src\kafunk\Consumer.fs:line 619
   at Microsoft.FSharp.Collections.IEnumerator.map@107.DoMoveNext(b& )
   at Microsoft.FSharp.Collections.IEnumerator.MapEnumerator`1.System-Collections-IEnumerator-MoveNext()
   at Microsoft.FSharp.Core.CompilerServices.RuntimeHelpers.takeInner@658[T,TResult](ConcatEnumerator`2 x, Unit unitVar0)
   at Kafunk.Prelude.SeqModule.partitionChoices4[a,b,c,d](IEnumerable`1 s) in C:\code\kafunk\src\kafunk\Utility\Prelude.fs:line 255
   at Kafunk.ConsumerModule.tryFetch@609-2.Invoke(FSharpChoice`2 _arg1) in C:\code\kafunk\src\kafunk\Consumer.fs:line 612
   at Microsoft.FSharp.Control.AsyncBuilderImpl.args@787-1.Invoke(a a)
---> (Inner Exception #0) Kafunk.Protocol+MessageTooBigException: partition=0 offset=14053 message_set_size=32768 message_size=44331
   at Kafunk.Protocol.MessageSet.Read(Int16 messageVer, Int32 partition, Int16 ec, Int32 messageSetSize, BinaryZipper buf) in C:\code\kafunk\src\kafunk\Protocol.fs:line 448
   at Kafunk.CompressionModule.SnappyModule.decompress(Int16 messageVer, Message m) in C:\code\kafunk\src\kafunk\Compression.fs:line 154
   at Kafunk.CompressionModule.decompress@167-1.Invoke(MessageSetItem msi) in C:\code\kafunk\src\kafunk\Compression.fs:line 175
   at Microsoft.FSharp.Collections.ArrayModule.Parallel.Collect@706-1.Invoke(Int32 obj)
   at System.Threading.Tasks.Parallel.<>c__DisplayClassf`1.<ForWorker>b__c()
   at System.Threading.Tasks.Task.InnerInvoke()
   at System.Threading.Tasks.Task.InnerInvokeWithArg(Task childTask)
   at System.Threading.Tasks.Task.<>c__DisplayClass11.<ExecuteSelfReplicating>b__10(Object param0)<---

This occurs after the outer message set has been decoded into a single message, which is decompressed and then further decoded into a message set. If the outer message set had partial messages, those would only be skipped in the outer decoding phase, not after decompression.