logstash-plugins/logstash-input-s3

s3 input plugin not handling shutdown correctly, leading to duplicates once started again

PadaKwaak opened this issue · 2 comments

The s3 input plugin does not store the position of the file it was busy processing when it detected that it should stop.
From the log file, you can see that the following code was called:

@logger.warn("Logstash S3 input, stop reading in the middle of the file, we will read it again when logstash is started")

Because it simply stops processing and does not store the position that it already processed, when you start logstash again, it would parse the same lines again, which then leads to duplicates.

I would have expected the S3 input plugin to either
a) continue processing till the end of the file and then update the sincedb file, or to
b) stop processing immediately, but before it scans the next line, and then write the current position of the file to the sincedb file like the file input plugin does.

  • Version: tested with 3.4.1, but the latest version (3.6.0) has the same shutdown handling
  • My config:
input {
    s3 {
        aws_credentials_file => "/usr/share/logstash/.aws/credentials"
        sincedb_path => "/opt/logstash/data/plugins/inputs/s3/sincedb_file"

        region => "us-east-1"
        bucket => "mybucket"
        prefix => "myfolder/"

        interval => 60
        additional_settings => {
          force_path_style => true
          follow_redirects => false
        }

        codec => json
    }
}

output {
    stdout { codec => rubydebug }
}
  • Steps to Reproduce:
    Upload a very large file onto S3 and let the S3 plugin ingest the file.
    While it is busy ingesting the file, let logstash shutdown (eg. sending it SIGTERM).
    You should then notice the following log message:
Logstash S3 input, stop reading in the middle of the file, we will read it again when logstash is started

When you then start logstash again, it would process the same records again.

In the meantime I have made a modification to the v3.4.1 on my own fork to not stop in the middle (ie. my option (b) that I suggested above), and confirmed that it is now working as expected when Logstash receives a shutdown signal while busy processing like a 6MB .gzip json encoded file:

[2021-04-16T09:50:52,717][WARN ][logstash.runner          ] SIGTERM received. Shutting down.
[2021-04-16T09:50:53,256][WARN ][org.logstash.input.DeadLetterQueueInputPlugin] closing dead letter queue input plugin
[2021-04-16T09:50:53,631][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"dlq-pipeline", :thread=>"#<Thread:0x3c9d0685@/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:51 run>"}
[2021-04-16T09:50:58,965][WARN ][org.logstash.execution.ShutdownWatcherExt] {"inflight_count"=>0, "stalling_threads_info"=>{["LogStash::Filters::Grok", {"patterns_dir"=>["/opt/logstash/patterns"], "match"=>{"[@metadata][s3][key]"=>"/(?<s3_file>[^/]+)\\.gz"}, "id"=>"dd3398e8c462a0384b18a7d950b46d28cb9f7fd8956073d9e58f5f18a9ea7402"}]=>[{"thread_id"=>37, "name"=>nil, "current_call"=>"[...]/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.4-java/lib/manticore/response.rb:50:in `call'"}, {"thread_id"=>38, "name"=>nil, "current_call"=>"[...]/vendor/bundle/jruby/2.3.0/gems/manticore-0.6.4-java/lib/manticore/response.rb:50:in `call'"}]}}
[2021-04-16T09:50:58,976][ERROR][org.logstash.execution.ShutdownWatcherExt] The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.
[2021-04-16T09:51:04,004][WARN ][org.logstash.execution.ShutdownWatcherExt] {"inflight_count"=>0, "stalling_threads_info"=>{"other"=>[{"thread_id"=>39, "name"=>"[s3-pipeline]<s3", "current_call"=>"[...]/vendor/bundle/jruby/2.3.0/gems/logstash-codec-json-3.0.5/lib/logstash/codecs/json.rb:48:in `from_json'"}], ["LogStash::Filters::Grok", {"patterns_dir"=>["/opt/logstash/patterns"], "match"=>{"[@metadata][s3][key]"=>"/(?<s3_file>[^/]+)\\.gz"}, "id"=>"dd3398e8c462a0384b18a7d950b46d28cb9f7fd8956073d9e58f5f18a9ea7402"}]=>[{"thread_id"=>37, "name"=>nil, "current_call"=>"[...]/vendor/bundle/jruby/2.3.0/gems/concurrent-ruby-1.0.5-java/lib/concurrent/map.rb:93:in `[]'"}, {"thread_id"=>38, "name"=>nil, "current_call"=>"[...]/vendor/bundle/jruby/2.3.0/gems/logstash-filter-grok-4.0.4/lib/logstash/filters/grok.rb:339:in `each'"}]}}
[2021-04-16T09:51:06,799][INFO ][logstash.pipeline        ] Pipeline has terminated {:pipeline_id=>"s3-pipeline", :thread=>"#<Thread:0x748701b3 run>"}

I made the modification on top of 3.4.1, since that was the version I was using earlier: v3.4.1...PadaKwaak:v342

Once the issue with 3.6.0 has been fixed that I've raised with #225, I'll see if I can create a pull request where this above modification is enabled/disabled with a setting so that people can choose to use it or not.

The duplication here is caused by an interruption before a file can update the checkpoint/ timestamp in sincedb. The sincedb is updated per file, so when the pipeline restarts it processes the same file again. As it does not cause a missing event, I do not classify this as a bug. A better way to handle it is enhancing sincedb to have a checkpoint per file and record the latest event id. The whole plugin is getting old and needs refactoring.

It is crucial to keep the restart process fast from the system administration point of view. I would avoid stalling the shutdown.