A consumer joining the consumer group takes 45s to get the first message
persona94 opened this issue · 7 comments
Description
The consumer subscribes to a topic in the following fashion
def setup_consumer(topic_list):
try:
consumer = Consumer(
<options for broker>,
'auto.offset.reset':'earliest',
'fetch.min.bytes': '200000',
}
)
consumer.subscribe(topic_list)
except Exception as e:
app.logger.error(
f'Exception during creation of active calls consumer - {e}'
)
return
start_consumer(consumer)
Inside start_consumer()
the main while loop looks like
while True:
message = consumer.poll(0.1)
if message is None:
print('Waiting for message')
gevent.sleep(0.1)
continue
elif message.error():
print(f'Error in loop {message.error()}')
break
else:
print('Message received')
....
I see the consumer takes 45s from "Waiting for message" to move to "Message received".
I turned up debug logs in kafka and saw this
"APIVERSION [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Enabling feature SaslAuthReq"}
"FEATURE [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Updated enabled protocol features to MsgVer1,ApiVersion,BrokerBalancedConsumer,ThrottleTime,Sasl,SaslHandshake,BrokerGroupCoordinator,LZ4,OffsetTime,MsgVer2,IdempotentProducer,ZSTD,SaslAuthReq"}
"STATE [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator: Broker changed state APIVERSION_QUERY -> UP"}
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: Broadcasting state change"}
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: Skipping metadata refresh of 1 topic(s): connected: already being requested"}
"CGRPSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed state wait-broker-transport -> up (join-state init)"}
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:main]: Broadcasting state change"}
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": join with 0 subscribed topic(s)"}
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: Hinted cache of 1/1 topic(s) being queried"}
"CGRPMETADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: consumer join: metadata for subscription only available for 0/1 topics (-1ms old)"}
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: Request metadata for 1 topic(s): consumer join"}
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": postponing join until up-to-date metadata is available"}
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state init -> wait-metadata (state up)"}
"SEND [my-kafka-consumer-ac#consumer-2] [thrd:kafka-controller-headless:9092/bootstrap]: kafka-controller-headless:9092/bootstrap: Sent MetadataRequest (v12, 88 bytes @ 0, CorrId 4)"}
"DUMP [my-kafka-consumer-ac#consumer-2] [thrd:main]: Assignment dump (started_cnt=0, wait_stop_cnt=0)"}
"DUMP_ALL [my-kafka-consumer-ac#consumer-2] [thrd:main]: List with 0 partition(s):"}
"DUMP_PND [my-kafka-consumer-ac#consumer-2] [thrd:main]: List with 0 partition(s):"}
"DUMP_QRY [my-kafka-consumer-ac#consumer-2] [thrd:main]: List with 0 partition(s):"}
"DUMP_REM [my-kafka-consumer-ac#consumer-2] [thrd:main]: List with 0 partition(s):"}
"RECV [my-kafka-consumer-ac#consumer-2] [thrd:kafka-controller-headless:9092/bootstrap]: kafka-controller-headless:9092/bootstrap: Received MetadataResponse (v12, 196 bytes, CorrId 4, rtt 0.19ms)"}
"ASSIGNDONE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": assignment operations done in join-state wait-metadata (rebalance rejoin=false)"}
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: ===== Received metadata (for 1 requested topics): consumer join ====="}
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: ClusterId: XJWpYEvfk0PCISvF51RubA, ControllerId: 0"}
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: 1 brokers, 1 topics"}
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: Broker #0/1: kafka-controller-0.kafka-controller-headless.kube-system.svc.cluster.local:9092 NodeId 0"}
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: Topic kafka_test_topic with 1 partitions"}
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: 1/1 requested topic(s) seen in metadata (lookup by name)"}
"CLUSTERID [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: ClusterId update \"\" -> \"XJWpYEvfk0PCISvF51RubA\""}
"CONTROLLERID [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-headless:9092/bootstrap: ControllerId update -1 -> 0"}
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:main]: Broadcasting state change"}
"SUBSCRIPTION [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": effective subscription list changed from 0 to 1 topic(s):"}
"SUBSCRIPTION [my-kafka-consumer-ac#consumer-2] [thrd:main]: Topic kafka_test_topic with 1 partition(s)"}
"REJOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": subscription updated from metadata change: rejoining group in state wait-metadata"}
"GRPLEADER [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": resetting group leader info: group (re)join"}
"REJOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" (re)joining in join-state wait-metadata with 0 assigned partition(s): Metadata for subscribed topic(s) has changed"}
"REBALANCE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" initiating rebalance (NONE) in state up (join-state wait-metadata) with 0 assigned partition(s): Metadata for subscribed topic(s) has changed"}
"REJOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": Rejoining group without an assignment: Metadata for subscribed topic(s) has changed"}
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state wait-metadata -> init (state up)"}
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": join with 1 subscribed topic(s)"}
"CGRPMETADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: consumer join: metadata for subscription is up to date (0ms old)"}
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-0.kafka-controller-headless.kube-system.svc.cluster.local:9092/0: Joining group \"my-kafka-consumer\" with 1 subscribed topic(s) and member id \"\""}
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state init -> wait-join (state up)"}
"SEND [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 212 bytes @ 0, CorrId 2)"}
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: Broadcasting state change"}
"RECV [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 84 bytes, CorrId 2, rtt 1.20ms)"}
"JOINGROUP [my-kafka-consumer-ac#consumer-2] [thrd:main]: JoinGroup response: GenerationId -1, Protocol , LeaderId , my MemberId my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc, member metadata count 0: Broker: Group member needs a valid member ID"}
"REQERR [my-kafka-consumer-ac#consumer-2] [thrd:main]: GroupCoordinator/0: JoinGroupRequest failed: Broker: Group member needs a valid member ID: explicit actions Ignore"}
"MEMBERID [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": updating member id \"\" -> \"my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc\""}
"REJOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": Rejoining group without an assignment: JoinGroup error: Broker: Group member needs a valid member ID"}
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state wait-join -> init (state up)"}
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": join with 1 subscribed topic(s)"}
"CGRPMETADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: consumer join: metadata for subscription is up to date (2ms old)"}
"JOIN [my-kafka-consumer-ac#consumer-2] [thrd:main]: kafka-controller-0.kafka-controller-headless.x.svc.cluster.local:9092/0: Joining group \"my-kafka-consumer\" with 1 subscribed topic(s) and member id \"my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc\""}
"CGRPJOINSTATE [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\" changed join state init -> wait-join (state up)"}
"SEND [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Sent JoinGroupRequest (v5, 276 bytes @ 0, CorrId 3)"}
"BROADCAST [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: Broadcasting state change"}
"METADATA [my-kafka-consumer-ac#consumer-2] [thrd:main]: Expired 1 entries from metadata cache (0 entries remain)"}
"Waiting for message"
"Waiting for message"
"Waiting for message"
"Waiting for message"
<snip><42s of repeated prints></snip>
"RECV [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 271 bytes, CorrId 3, rtt 42039.28ms)"}
"JOINGROUP [my-kafka-consumer-ac#consumer-2] [thrd:main]: JoinGroup response: GenerationId 14, Protocol range, LeaderId my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc (me), my MemberId my-kafka-consumer-ac-fda83935-3fda-4922-b7b6-8c6ff89e02dc, member metadata count 1: (no error)"}
"JOINGROUP [my-kafka-consumer-ac#consumer-2] [thrd:main]: I am elected leader for group \"my-kafka-consumer\" with 1 member(s)"}
"GRPLEADER [my-kafka-consumer-ac#consumer-2] [thrd:main]: Group \"my-kafka-consumer\": resetting group leader info: JoinGroup response clean-up"}
How to reproduce
Checklist
Please provide the following information:
- confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): v2.4.0 - Apache Kafka broker version: 3.7.0
- Client configuration:
'auto.offset.reset':'earliest', 'client.id': 'my-kafka-consumer-ac', 'fetch.min.bytes': '200000', 'metadata.max.age.ms': '1000',
- Operating system:
- Provide client logs (with
'debug': '..'
as necessary) - Provide broker log excerpts
- Critical issue
Is there any other information I can provide? @pranavrth
Can you also provide time in the logs? Better to show human readable logs. Your app logger is not logging time.
JoinGroupRequest
is taking around 42 seconds.
"RECV [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 271 bytes, CorrId 3, rtt 42039.28ms)"}
How many consumers is this coordinator serving? Is there some issue in the coordinaor? I don't know what is happening at the coordinator side which is taking 42s to respond.
Can you also provide time in the logs? Better to show human readable logs. Your app logger is not logging time.
Sure. I had trimmed the logs because they were a lot, I've attached the unmodified file from this morning
book.log
JoinGroupRequest
is taking around 42 seconds.
"RECV [my-kafka-consumer-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 271 bytes, CorrId 3, rtt 42039.28ms)"}
How many consumers is this coordinator serving? Is there some issue in the coordinaor? I don't know what is happening at the coordinator side which is taking 42s to respond.
This coordinator serves 2 consumers. Each in it's own consumer group. I run this consumer on 3 separate clusters (ie.e all different independent kafka instances) and all of them seem to have the same issue. From what I can tell this is not an issue on the other consumer which uses a c++ driver. Also, if I use kafka-python as my kafka driver I don't see this delay, I connect instantly
{"asctime": "2024-06-28 13:39:21,384", "levelname": "DEBUG", "thread": 140737300446912, "filename": "kafka.py", "lineno": 141, "message": "METADATA [kube-system.my-app-ac#consumer-2] [thrd:main]: Expired 1 entries from metadata cache (0 entries remain)"}
{"asctime": "2024-06-28 13:39:58,577", "levelname": "DEBUG", "thread": 140737300446912, "filename": "kafka.py", "lineno": 141, "message": "RECV [kube-system.my-app-ac#consumer-2] [thrd:GroupCoordinator]: GroupCoordinator/0: Received JoinGroupResponse (v5, 271 bytes, CorrId 3, rtt 38114.32ms)"}
The above consecutive logs are 38s apart which corelates with RTT of JoinGroupRequest. I suspect something wrong on the coordinator side. Can you check the coordinator logs?
@pranavrth sorry for not getting back to you sooner. I don't have access to the coordinator requests but did a lot of digging around when this happens and it's a specific set of circumstances of when the process is killed, what the kubernetes cluster thinks of that kill, how my test harness affected all of these things... anyway I think the coordinator is taking that long to recognize my process reconnecting. I'm closing this, thanks for the help!