logstash-plugins/logstash-input-file

logstash repeatedly processes a corrupted compressed file in and endless loop

xo4n opened this issue · 4 comments

xo4n commented

We had a case where several corrupted compressed files ended up in the input directory of the logstash pipeline. What happens after is that logstash reads and processes some of the lines of the corrupted files, sends some corrupted data to the output, and throws an error before finishing reading.

Because it didn't finish properly, the file is not marked as processed and it is picked up again over and over. The result is a continuous stream of corrupted data being sent to the output.

logstash_1  | [2020-02-28T14:00:21,027][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.5.2"}
logstash_1  | [2020-02-28T14:00:24,963][INFO ][org.reflections.Reflections] Reflections took 100 ms to scan 1 urls, producing 20 keys and 40 values
logstash_1  | [2020-02-28T14:00:27,790][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge] A gauge metric of an unknown type (org.jruby.RubyArray) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
logstash_1  | [2020-02-28T14:00:27,798][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, "pipeline.sources"=>["/etc/logstash/conf.d/logstash_akamai.conf"], :thread=>"#<Thread:0x5cd406bb run>"}
logstash_1  | [2020-02-28T14:00:28,139][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>"main"}
logstash_1  | [2020-02-28T14:00:28,228][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
logstash_1  | [2020-02-28T14:00:28,233][INFO ][filewatch.observingread  ] START, creating Discoverer, Watch with file and sincedb collections
logstash_1  | [2020-02-28T14:00:28,723][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
logstash_1  | [2020-02-28T14:00:29,708][INFO ][logstash.outputs.file    ] Opening file {:path=>"/var/log/akamai_messages.log"}
logstash_1  | [2020-02-28T14:02:03,053][ERROR][filewatch.readmode.handlers.readzipfile] Cannot decompress the gzip file at path: /var/log/akamai/trv_imgcy_698321.esw3ccs_ghostip_S.202002271000-1100-4.gz
logstash_1  | [2020-02-28T14:03:32,609][ERROR][filewatch.readmode.handlers.readzipfile] Cannot decompress the gzip file at path: /var/log/akamai/trv_imgcy_698321.esw3ccs_ghostip_S.202002271000-1100-4.gz
logstash_1  | [2020-02-28T14:04:57,801][ERROR][filewatch.readmode.handlers.readzipfile] Cannot decompress the gzip file at path: /var/log/akamai/trv_imgcy_698321.esw3ccs_ghostip_S.202002271000-1100-4.gz
logstash_1  | [2020-02-28T14:06:21,140][ERROR][filewatch.readmode.handlers.readzipfile] Cannot decompress the gzip file at path: /var/log/akamai/trv_imgcy_698321.esw3ccs_ghostip_S.202002271000-1100-4.gz
^CGracefully stopping... (press Ctrl+C again to force)

The input configuration

input
{
  file
  {
    path => "/var/log/akamai/trv_imgcy_698321.esw3ccs_ghostip_S.202002271000-1100-4.gz"
    mode => "read"
    start_position => "beginning"
    file_completed_action => "delete"
    sincedb_path => "/dev/null"
  }
}

Tried with 2 different versions ( 7.5.2 and 7.1.1 ) and the issue was reproduceable in both cases

Hi @xo4n I wonder which data Logstash loads from a corrupted file. I mean if the gzip file is corrupted you aren't able to decompress it, so no content should be load from it. I think that probably using the tail mode could help to avoid continue failing, but do you have a shareable corrupted gzip that manifest the problem?

Thanks @xo4n for sharing the file, I was able to reproduce locally. The problem is that when the reading of a line from the gziped input stream

while (line = buffered.readLine(false))
throws an error

java.util.zip.InflaterInputStream.read(java/util/zip/InflaterInputStream.java:164)
java.util.zip.GZIPInputStream.read(java/util/zip/GZIPInputStream.java:117)
sun.nio.cs.StreamDecoder.readBytes(sun/nio/cs/StreamDecoder.java:284)
sun.nio.cs.StreamDecoder.implRead(sun/nio/cs/StreamDecoder.java:326)
sun.nio.cs.StreamDecoder.read(sun/nio/cs/StreamDecoder.java:178)
java.io.InputStreamReader.read(java/io/InputStreamReader.java:184)
java.io.BufferedReader.fill(java/io/BufferedReader.java:161)
java.io.BufferedReader.readLine(java/io/BufferedReader.java:324)
java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:498)
org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:456)
org.jruby.javasupport.JavaMethod.invokeDirect(org/jruby/javasupport/JavaMethod.java:317)
.logstash_minus_input_minus_file.lib.filewatch.read_mode.handlers.read_zip_file.handle_specifically(logstash-input-file/lib/filewatch/read_mode/handlers/read_zip_file.rb:26)

it's then executed only the ensure block

ensure
# rescue each close individually so all close attempts are tried
close_and_ignore_ioexception(buffered) unless buffered.nil?
close_and_ignore_ioexception(decoder) unless decoder.nil?
close_and_ignore_ioexception(gzip_stream) unless gzip_stream.nil?
close_and_ignore_ioexception(file_stream) unless file_stream.nil?
end

that essentially closes the input resources but doesn't delete the file, nor mark it as processes, so the next discovery loop find it again, process, creating the same events as done before, and so on.

There are two possible strategies to solve this:

  • before processing the input gzip file, validate if it's coorrupted or not, if corrupted skip it completely.
  • or process the file as it can, it it gets a java.util.zip.ZipException then move the file in a staging directory that the plugin doesn't read. For this implementation we could imagine some configuration settings. The action to do in case of error (move|delete) the path where move the corrupted file, eventually a boolean switch to enable/disable this behaviour

solved with version 4.1.17