KafkaSource operator printstacktrace forever when there's one corrupt record
Closed this issue · 4 comments
Hi, our team has been using this toolkit for a while, and we are experiencing a problem while using KafkaSource operator.
Our kafka broker have record size set to very large(20M), but only very few(maybe one in several weeks) corrupt record are larger than 4M. When we set our fetch size to 20M, we would get an error of "“Error ILLEGAL_GENERATION occurred while committing offsets" continuously, so we turned around and set fetch size to 4M, which works smoothly most of the time. However, when there's one corrupt record, we would see the CPU usage spikes as KafkaSource keeps printing the same message to to stack trace. This will block the processing of the offsets and the application will stuck there forever.
To solve the problem we have, we are hoping to get an option of skipping the corrupt record(or manually commit offset) after it caused exception. Is this possible in later releases? Or shall we go ahead and make the update ourselves and create pull request to the project?
Thanks!
Are you certain it is due to a corrupt record? I would have expected a SerializationException to be thrown if there was a problem deserializing the record.
As a side note, we are currently in the process of splitting the Kafka operators into a separate toolkit (https://github.com/IBMStreams/streamsx.kafka). The toolkit is currently in development and does not yet have a release. The operators in this toolkit are targeting Kafka v0.10.x. Some changes have been made to how errors are handled while consuming records. If you have a chance, you may want to give this toolkit a try.
Hi @cancilla, I use the word "corrupt" because it's unexpectedly big and is not planned. The error we got was fetch_size issue, which was"There are some messages whose size is larger than the fetch size 1048576 and hence cannot be ever returned". So we want a way to skip these messages.
The current releases of Kafka do not provide a clean way to skip a message in the event of an error. I think this is a similar problem to these issues: KAFKA-5211 and KAFKA-4740
When an exception like yours occurs, the consumer offset is not updated. Thus, subsequent calls to poll() will attempt to retrieve the same set of messages, leading to an endless loop. According to the JIRAs I linked to, it looks like this will be fixed in the next release of Kafka (v0.11).
Customers are using the (from-scratch) new toolkit https://github.com/IBMStreams/streamsx.kafka that was started from scratch. If you still encounter the issue there, open an issue over there, please.