logstash-plugins/logstash-input-kafka

INVALID_FETCH_SESSION_EPOCH - Sending LeaveGroup request to coordinator

cdenneen opened this issue ยท 11 comments

I've posted this on the forum (along with many other people but no responses and closed disussions):
https://discuss.elastic.co/t/kafka-invalid-fetch-session-epoch/187441

  • I'd like to fully understand why this keeps happening.
  • A better way to handle so after it "Leaves" it can "Rejoin"?

I've tried checking the logstash Monitoring API but I don't see anything here to correlate this plugin isn't working anymore after it "Leaves" so I am stuck arbitrarily looking through log output for LeaveGroup string in order to determine that I must restart logstash.

Even trying this:

    max_poll_records => "200"
    max_poll_interval_ms => "600000"

Still getting:

{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1562872255710,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-1, groupId=devgilogs] Node 3 was unable to process the fetch request with (sessionId=1045052879, epoch=6): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1562872255845,"thread":"Ruby-0-Thread-65: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] Node 1 was unable to process the fetch request with (sessionId=1934164936, epoch=7): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1562872265350,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-2, groupId=devgilogs] Node 3 was unable to process the fetch request with (sessionId=508411314, epoch=37): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1562872277017,"thread":"Ruby-0-Thread-67: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-2, groupId=devgilogs] Node 1 was unable to process the fetch request with (sessionId=808909764, epoch=27): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1562872277017,"thread":"Ruby-0-Thread-67: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-2, groupId=devgilogs] Node 3 was unable to process the fetch request with (sessionId=444142735, epoch=11): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1562872325905,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-2, groupId=devgilogs] Node 2 was unable to process the fetch request with (sessionId=1200630398, epoch=33): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1562872347272,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-2, groupId=devgilogs] Node 2 was unable to process the fetch request with (sessionId=301078840, epoch=9): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1562872371926,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-2, groupId=devgilogs] Node 3 was unable to process the fetch request with (sessionId=1710138438, epoch=68): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1562872372792,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-2, groupId=devgilogs] Node 2 was unable to process the fetch request with (sessionId=1464246895, epoch=12): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1562872383628,"thread":"Ruby-0-Thread-65: :1","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] Node 2 was unable to process the fetch request with (sessionId=122035194, epoch=120): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1562872398868,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] Node 1 was unable to process the fetch request with (sessionId=127336610, epoch=62): INVALID_FETCH_SESSION_EPOCH."}}
{"level":"INFO","loggerName":"org.apache.kafka.clients.FetchSessionHandler","timeMillis":1562872398874,"thread":"kafka-coordinator-heartbeat-thread | devgilogs","logEvent":{"message":"[Consumer clientId=rndlogstash1-0, groupId=devgilogs] Node 2 was unable to process the fetch request with (sessionId=229221809, epoch=17): INVALID_FETCH_SESSION_EPOCH."}}

I also encountered the same situation.
Logstash can not consume message from kafka at all.
No workaround exists?

jsvd commented

This seems related to this kafka bug https://issues.apache.org/jira/browse/KAFKA-8052
I created #327 to upgrade the client to 2.3.0, we'll publish it soon.

@jsvd is this a client side bug only or server side? I'm guessing it's client side so upgrading logstash to use the 2.3.0 plugin with 2.1.0 server should fix the issue?

jsvd commented

@cdenneen looking at apache/kafka#6582 seems to be the client side, so upgrading should fix it. That said I'm no kafka expert, I've just been scouring exception messages and hunting open/closed issues :D

I updated the Logstash plugins today, and I no longer see the INVALID_FETCH_SESSION_EPOCH messages on the logs, so looking good to me. But, contrary to what someone mentioned above, I was still able to consume at high rate before the update, and from what I understand from KAFKA-8052 it would just cause retries.

@voiprodrigo You are right.
I was misunderstood. Actually messages was consumed despite the session log.

I am getting the same errror.

2019-08-22 14:15:03.458 INFO 3193 --- [pool-1-thread-1] o.a.k.c.FetchSessionHandler : [Consumer clientId=consumer-1, groupId=aaaa2] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 3: org.apache.kafka.common.errors.DisconnectException.
2019-08-22 14:15:33.659 INFO 3193 --- [pool-1-thread-1] o.a.k.c.FetchSessionHandler : [Consumer clientId=consumer-1, groupId=aaaa2] Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: org.apache.kafka.common.errors.DisconnectException.

upgraded to latest version of kafka still not working , i am not even able to establish a initial connection also

Seeing this. What was the solution?

@ashishbhumireddy , did you solve this issue? I've got the same one.