PollFetches take long time for the first fetch
secmask opened this issue · 2 comments
secmask commented
I'm testing with development kafka node in local, and seem PollFetches always take few seconds to return for the first time,
is this normal? is there any way to set config that I'm missing?
here's my code
seeds := []string{"localhost:9093"}
cl, err := kgo.NewClient(
kgo.SeedBrokers(seeds...),
kgo.ConsumerGroup("my-group-identifier"),
kgo.ConsumeTopics(topic),
kgo.DisableAutoCommit(),
kgo.FetchMaxBytes(200),
kgo.FetchMaxWait(time.Second),
)
if err != nil {
panic(err)
}
defer cl.Close()
for {
l.Info("fetching...")
fetches := cl.PollFetches(context.Background())
if errs := fetches.Errors(); len(errs) > 0 {
l.Error("got errs", "errs", errs)
panic(errs)
}
l.Info("fetch complete", "count", len(fetches.Records()))
for _, fetch := range fetches.Records() {
l.Info("record", "data", string(fetch.Value))
}
}
result
time=2024-05-21T23:43:53.217+07:00 level=INFO msg=fetching...
time=2024-05-21T23:43:56.304+07:00 level=INFO msg="fetch complete" count=2
time=2024-05-21T23:43:56.304+07:00 level=INFO msg=record data="message: 1"
time=2024-05-21T23:43:56.304+07:00 level=INFO msg=record data="message: 2"
time=2024-05-21T23:43:56.304+07:00 level=INFO msg=fetching...
time=2024-05-21T23:43:56.314+07:00 level=INFO msg="fetch complete" count=2
time=2024-05-21T23:43:56.314+07:00 level=INFO msg=record data="message: 3"
time=2024-05-21T23:43:56.314+07:00 level=INFO msg=record data="message: 4"
time=2024-05-21T23:43:56.314+07:00 level=INFO msg=fetching...
time=2024-05-21T23:43:56.324+07:00 level=INFO msg="fetch complete" count=2
time=2024-05-21T23:43:56.324+07:00 level=INFO msg=record data="message: 5"
time=2024-05-21T23:43:56.324+07:00 level=INFO msg=record data="message: 6"
...
you can see the time diff for the first take up 3-4 secs.
secmask commented
I add a logger, it spent a lot of time on read JoinGroup
time=2024-05-23T10:45:17.230+07:00 level=DEBUG msg="read FindCoordinator v4" broker=1 bytes_read=57 read_wait=38.879µs time_to_read=4.379832ms err=<nil>
time=2024-05-23T10:45:17.231+07:00 level=DEBUG msg="opening connection to broker" addr=localhost:9093 broker=1
time=2024-05-23T10:45:17.231+07:00 level=DEBUG msg="connection opened to broker" addr=localhost:9093 broker=1
time=2024-05-23T10:45:17.231+07:00 level=DEBUG msg="connection initialized successfully" addr=localhost:9093 broker=1
time=2024-05-23T10:45:17.231+07:00 level=DEBUG msg="wrote JoinGroup v9" broker=1 bytes_written=137 write_wait=587.209µs time_to_write=13.092µs err=<nil>
time=2024-05-23T10:45:17.240+07:00 level=DEBUG msg="read JoinGroup v9" broker=1 bytes_read=66 read_wait=38.105µs time_to_read=8.385049ms err=<nil>
time=2024-05-23T10:45:17.240+07:00 level=INFO msg="join returned MemberIDRequired, rejoining with response's MemberID" group=my-group-identifier member_id=kgo-2d3e0e05-dd52-4cb6-bb09-fcc2969a1d7a
time=2024-05-23T10:45:17.240+07:00 level=DEBUG msg="wrote JoinGroup v9" broker=1 bytes_written=177 write_wait=26.809µs time_to_write=10.857µs err=<nil>
time=2024-05-23T10:45:52.167+07:00 level=DEBUG msg="read JoinGroup v9" broker=1 bytes_read=193 read_wait=30.731µs time_to_read=34.926236626s err=<nil>
time=2024-05-23T10:45:52.167+07:00 level=INFO msg="joined, balancing group" group=my-group-identifier member_id=kgo-2d3e0e05-dd52-4cb6-bb09-fcc2969a1d7a instance_id=<nil> generation=2 balance_protocol=sticky leader=true
time=2024-05-23T10:45:52.167+07:00 level=INFO msg="balancing group as leader"
time=2024-05-23T10:45:52.167+07:00 level=INFO msg="balance group member" id=kgo-2d3e0e05-dd52-4cb6-bb09-fcc2969a1d7a interests="interested topics: [foo], previously owned: "
time=2024-05-23T10:45:52.167+07:00 level=INFO msg=balanced plan=kgo-2d3e0e05-dd52-4cb6-bb09-fcc2969a1d7a{foo[0]}
time=2024-05-23T10:45:52.167+07:00 level=INFO msg=syncing group=my-group-identifier protocol_type=consumer protocol=sticky
twmb commented
There are two potential culprits,
- On any new group, Kafka internally forces a 3s wait, check the config value
group.initial.rebalance.delay.ms
for more information - If you are restarting your consumer using the same group, then your new consumer gets a new member ID in the existing group. Kafka triggers a join group, and then only after the previous consumer "dies" (misses the session timeout) does Kafka allow the rebalance to continue. In your above code, I don't see any room for a ctrl+c to be handled / it doesn't look like there's a proper
LeaveGroup
. Thedefer cl.Close
internally does aLeaveGroup
, but your for loop is infinite, so the close is never properly leaving the group.
Given the timeout in the logs is 38s, I suspect it's (2).
Let me know if you think it's something else.