logstash-plugins/logstash-output-elasticsearch

Some bulk requests are retried for ever even when we are trying to stop a pipeline

pfcoperez opened this issue · 1 comments

Logstash information:

  • Logstash version: >=8.5.2, maybe previous versions too
  • Installation source: Any
  • How is it being run? As part of a managed Docker container by internal Elastic Cloud orchestration system.
  • The plugin is included in the default LS distribution.

JVM (e.g. java -version):

Bundled

OS version (uname -a if on a Unix-like system):

N/A

Description of the problem including expected versus actual behavior:

At Elastic cloud we are deploying some Logstash instances as managed services, that is, their configurations get updated automatically by an external service reacting to some cloud configuration changes.

Once scenario we contemplate is the possibility of evicting certain pipelines when their target ES cluster become unhealthy, unresponsive or unavailable. That eviction action isn't much more than a dynamic reconfiguration of the pipelines which boils down to stopping the affected pipeline.

For some of the problems we do tests this works nicely as the for-ever-retry loop ares stopped when the pipeline is stopping but, for others, as bad authentication response code (401) the pipeline never stops as it gets trapped in the loop of retries for ever.

By reading the plugin source code we found that both connectivity errors and unexpected errors have a retry safe-guard stopping the retry loop when the pool/pipeline is stopping. e.g: Last line in the following snippet.

rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError => e
# If we can't even connect to the server let's just print out the URL (:hosts is actually a URL)
# and let the user sort it out from there
@logger.error(
"Attempted to send a bulk request but Elasticsearch appears to be unreachable or down",
message: e.message, exception: e.class, will_retry_in_seconds: sleep_interval
)
@logger.debug? && @logger.debug("Failed actions for last bad bulk request", :actions => actions)
# We retry until there are no errors! Errors should all go to the retry queue
sleep_interval = sleep_for_interval(sleep_interval)
@bulk_request_metrics.increment(:failures)
retry unless @stopping.true?

However, other response codes (like 401) don't:

rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e
@bulk_request_metrics.increment(:failures)
log_hash = {:code => e.response_code, :url => e.url.sanitized.to_s,
:content_length => e.request_body.bytesize, :body => e.response_body}
message = "Encountered a retryable error (will retry with exponential backoff)"
# We treat 429s as a special case because these really aren't errors, but
# rather just ES telling us to back off a bit, which we do.
# The other retryable code is 503, which are true errors
# Even though we retry the user should be made aware of these
if e.response_code == 429
logger.debug(message, log_hash)
else
logger.error(message, log_hash)
end
sleep_interval = sleep_for_interval(sleep_interval)
retry

Our reasoning is that this latter branch should have the same safeguard because in this use case as well as others it might be that the pipeline was configured with parameters that are not longer valid (e.g: expired API keys) being the best solution to the management problem to just reconfigure the pipeline without restarting the whole LS instance.

We are filling a PR replacing retry with retry unless @stopping.true?in the branch dealing with known retry-able events.

Steps to reproduce:

Please include a minimal but complete recreation of the problem,
including (e.g.) pipeline definition(s), settings, locale, etc. The easier
you make for us to reproduce it, the more likely that somebody will take the
time to look at it.

Provide logs (if relevant):

The problem is not limited to just conditioning the retry attempt on @stopping.true? on all the bulk request branches with retry unless @stopping.true?.

We found that @stopping.true? never becomes true. The reason for this, as discussed with the Logstash team, is that close on the output code is not invoked until all the batches are completed:

https://github.com/elastic/logstash/blob/0600ff98bbd54918c8d18d2e4372f96c71dc235c/logstash-core/lib/logstash/java_pipeline.rb#L474-L484

  def shutdown_workers
    @shutdownRequested.set(true)


    @worker_threads.each do |t|
      @logger.debug("Shutdown waiting for worker thread" , default_logging_keys(:thread => t.inspect))
      t.join
    end


    filters.each(&:do_close)
    outputs.each(&:do_close)
  end

The logic is as follows:

  1. The shutdown flag is set thus triggering the shut-down process.
  2. When all workers have finished shutting down, then they get their close method invoked:
  3. This close method is what sets the @stopping flag:
    def close
    @stopping.make_true if @stopping
    stop_after_successful_connection_thread
    @client.close if @client
    end
  4. Which is, in turn, the signal to stop retrying in
    rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError => e
    # If we can't even connect to the server let's just print out the URL (:hosts is actually a URL)
    # and let the user sort it out from there
    @logger.error(
    "Attempted to send a bulk request but Elasticsearch appears to be unreachable or down",
    message: e.message, exception: e.class, will_retry_in_seconds: sleep_interval
    )
    @logger.debug? && @logger.debug("Failed actions for last bad bulk request", :actions => actions)
    # We retry until there are no errors! Errors should all go to the retry queue
    sleep_interval = sleep_for_interval(sleep_interval)
    @bulk_request_metrics.increment(:failures)
    retry unless @stopping.true?

So the condition to stop retrying can only be fulfilled when the plugin has already stopped retrying leading to a deadlock.
The incoming fix PR will cover for this too by replacing checks on @stopping with the following condition encapsulated in a helper method:

    def shutting_down?
      @stopping.true? || (execution_context.pipeline.shutdown_requested? && !execution_context.pipeline.worker_threads_draining?)
    end

Where:

  • execution_context.pipeline.shutdown_requested? is set right in the first line of the body of shutdown_workers.
  • !execution_context.pipeline.worker_threads_draining? Makes sure we keep retrying when the pipeline is configured to drain the events before being stopped.