logstash-plugins/logstash-input-kafka

Automatic offset commit on consumer.close without adding the event to the queue

lepek opened this issue · 11 comments

lepek commented

Versions:

  • logstash-input-kafka 8.0.4
  • logstash 6.2.3

Configuration:

input {
    bootstrap_servers => "XXXXX:9092"
    topic => "logs"
    consumer_threads => 1
    group_id => "xxx_group"
    heartbeat_interval_ms => "10000"
    session_timeout_ms => "30000"
    codec => "json"
    client_id => "xxx-id"
    max_poll_records => "100"
    auto_offset_reset => "earliest"
}

I am wondering if this scenario is possible with the offset auto commit ON (default):

  1. Poll from Kafka
  2. Some exception is triggered BEFORE adding the event(s) to the Logstash queue
  3. The consumer is closed and the offset is committed (default behaviour)

I am trying to debug an issue we had in which the Logstash instance was restarted several times by an external tool/script due to an exception and when the issue was finally solved, one day later, it started consuming from the end of the partitions, which means that the offset was committed in that time, even when no message was processed (I am sure about this since we have the metrics pack enable and no message flow through, and of course, no message reach the output on that time neither).
If it is a possible scenario I think I can move to the manual commit which should prevent the offset commit in the consumer.close.

jsvd commented
  1. Poll from Kafka
  2. Some exception is triggered BEFORE adding the event(s) to the Logstash queue
  3. The consumer is closed and the offset is committed (default behaviour)

With auto commit enabled (Default behaviour) the consumer offset is advanced at each poll operation, and then it is commited periodically according to auto_commit_interval_ms which defaults to 5 seconds.

Also, during the normal shutdown procedure in logstash, when this input shuts down it calls close on the consumer which will commit the offset.

So your scenario is totally possible: an exception stops the pipeline from processing the events it poll'ed from kafka and auto_commit_interval_ms kicks in and the offset is committed.
In hindsight, I think auto commit should not be the default behaviour.

OTOH, as long as there are no errors and as long as logstash shuts down gracefully (no kill -9), no data should be lost when restarting.

jsvd commented

@original-brownbear can I get your validation here? thanks!

lepek commented

It is good to confirm it, as I can't assume Logstash will be always shutdown gracefully in my case.

@jsvd +1 on this issue, I agree with the description and the fact that the is a problem.

One fix I could suggest would be to call org.apache.kafka.clients.consumer.KafkaConsumer#close(long, java.util.concurrent.TimeUnit) instead of the no-arg close on the consumer and use a really low timeout (just 0 should work fine) to not wait for an offset commit during close. That should prevent this from happening imo.

jsvd commented

@original-brownbear but the issue seems to be that the pipeline at some point gets blocked downstream due to an error or output failing to send data, after 5 seconds of backpressure the offset is committed. If the back pressure isn't resolved and logstash is shutdown abruptly data is lost (consumer.close is never called)

@jsvd oh sorry for misreading that, the way I understood it was that the fact that close waits its default time on the commit is causing trouble here.

Unfortunately I agree again though, the scenario where the pipeline stalls for longer than 5s is lot more likely. This has been reported in different forms before (thinking of the related issue of auto commit + PQ commit being decoupled).
I don't really see a way out of this though when using auto-commit in general. Just conceptually, auto-commit is a periodic action and the inputs are running in their separate threads => we can't really prevent auto-committing if the worker threads stall.
=> Auto commit isn't so great for us here, would obviously be nicer to only commit once stuff has passed the pipeline ... which would be what E2E acks were supposed to deliver API wise I guess. With the current APIs LS core provides it's not really possible to check whether or not some data has passed the queue yet.

jsvd commented

Thanks for the sanity check Armin.

I think the path forward is clear: we should document the current behaviour more clearly and have auto commit off by default in logstash 7.x.

lepek commented

While I agree about disabling the auto commit (and with most of what you guys are saying here), if you do that right now, the logic indicates it will commit the offset, synchronously, after each poll. Although I have no benchmarks, I think that will be overkill for the performance.
What I will do, in my project, is to check if I have to commit the offset after adding the event to the queue, using the auto_commit_interval_ms setting (last_commit_timestamp + auto_commit_interval_ms < current_timestamp), and commit the offset asynchronously.
I think doing that I will avoid losing data if something goes wrong and the close is triggered, and also I should more or less match the performance of the auto commit mode.
What do you think about that?

jsvd commented

@lepek I'm +1 on adding a manual periodic commit, it could be called manual_commit_interval_ms, where (if enable_auto_commit is false):

  • = 0 (default): commits on every poll
  • > 0: commits offsets at every interval (approximately)

This check would be done before the poll call (or after writing to the queue), so the interplay between poll_timeout_ms, fetch_max_wait_ms, request_timeout_ms and this new manual_commit_interval_ms would have to be sanity checked to ensure the users sets sane values.

jsvd commented

I created #270 to track this enhancement, so I think we can close this issue @lepek ?

lepek commented

Yes, I think it can be closed. Thanks to both of you for the feedback, it helped a lot.