kafka: data race accessing/updating partition consumers
Closed this issue · 3 comments
axw commented
https://github.com/elastic/apm-queue/actions/runs/5183467517/jobs/9341397490?pr=172
=== RUN TestMultipleConsumers
==================
WARNING: DATA RACE
Write at 0x00c0002cd920 by goroutine 12237:
runtime.mapdelete()
/opt/hostedtoolcache/go/1.20.4/x64/src/runtime/map.go:695 +0x0
github.com/elastic/apm-queue/kafka.(*consumer).lost()
/home/runner/work/apm-queue/apm-queue/kafka/consumer.go:329 +0x2ca
github.com/elastic/apm-queue/kafka.(*consumer).lost-fm()
<autogenerated>:1 +0x6d
github.com/twmb/franz-go/pkg/kgo.(*consumer).initGroup.func2()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:266 +0x54e
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).revoke()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:636 +0x1043
github.com/twmb/franz-go/pkg/kgo.(*assignRevokeSession).prerevoke.func1()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:696 +0xe4
Previous read at 0x00c0002cd920 by goroutine 4970:
runtime.mapaccess1()
/opt/hostedtoolcache/go/1.20.4/x64/src/runtime/map.go:395 +0x0
github.com/elastic/apm-queue/kafka.(*Consumer).fetch.func2()
/home/runner/work/apm-queue/apm-queue/kafka/consumer.go:238 +0x104
github.com/twmb/franz-go/pkg/kgo.Fetches.EachPartition()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/record_and_fetch.go:520 +0x234
github.com/elastic/apm-queue/kafka.(*Consumer).fetch()
/home/runner/work/apm-queue/apm-queue/kafka/consumer.go:232 +0x5f5
github.com/elastic/apm-queue/kafka.(*Consumer).Run()
/home/runner/work/apm-queue/apm-queue/kafka/consumer.go:185 +0xa4
github.com/elastic/apm-queue/kafka.TestMultipleConsumers.func2()
/home/runner/work/apm-queue/apm-queue/kafka/consumer_test.go:543 +0xe4
Goroutine 12237 (running) created at:
github.com/twmb/franz-go/pkg/kgo.(*assignRevokeSession).prerevoke()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:693 +0x11a
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:760 +0x50b
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).manage()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:315 +0x2c9
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).findNewAssignments.func3()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:1713 +0x39
Goroutine 4970 (running) created at:
github.com/elastic/apm-queue/kafka.TestMultipleConsumers()
/home/runner/work/apm-queue/apm-queue/kafka/consumer_test.go:541 +0x6be
testing.tRunner()
/opt/hostedtoolcache/go/1.20.4/x64/src/testing/testing.go:[1576](https://github.com/elastic/apm-queue/actions/runs/5183467517/jobs/9341397490?pr=172#step:4:1577) +0x216
testing.(*T).Run.func1()
/opt/hostedtoolcache/go/1.20.4/x64/src/testing/testing.go:1629 +0x47
==================
==================
WARNING: DATA RACE
Write at 0x00c0004b4430 by goroutine 12237:
runtime.closechan()
/opt/hostedtoolcache/go/1.20.4/x64/src/runtime/chan.go:357 +0x0
github.com/elastic/apm-queue/kafka.(*consumer).lost()
/home/runner/work/apm-queue/apm-queue/kafka/consumer.go:330 +0x2d7
github.com/elastic/apm-queue/kafka.(*consumer).lost-fm()
<autogenerated>:1 +0x6d
github.com/twmb/franz-go/pkg/kgo.(*consumer).initGroup.func2()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:266 +0x54e
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).revoke()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:636 +0x1043
github.com/twmb/franz-go/pkg/kgo.(*assignRevokeSession).prerevoke.func1()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:696 +0xe4
Previous read at 0x00c0004b4430 by goroutine 4970:
runtime.chansend()
/opt/hostedtoolcache/go/1.20.4/x64/src/runtime/chan.go:160 +0x0
github.com/elastic/apm-queue/kafka.(*Consumer).fetch.func2()
/home/runner/work/apm-queue/apm-queue/kafka/consumer.go:238 +0x1e4
github.com/twmb/franz-go/pkg/kgo.Fetches.EachPartition()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/record_and_fetch.go:520 +0x234
github.com/elastic/apm-queue/kafka.(*Consumer).fetch()
/home/runner/work/apm-queue/apm-queue/kafka/consumer.go:232 +0x5f5
github.com/elastic/apm-queue/kafka.(*Consumer).Run()
/home/runner/work/apm-queue/apm-queue/kafka/consumer.go:185 +0xa4
github.com/elastic/apm-queue/kafka.TestMultipleConsumers.func2()
/home/runner/work/apm-queue/apm-queue/kafka/consumer_test.go:543 +0xe4
Goroutine 12237 (running) created at:
github.com/twmb/franz-go/pkg/kgo.(*assignRevokeSession).prerevoke()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:693 +0x11a
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:760 +0x50b
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).manage()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:315 +0x2c9
github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).findNewAssignments.func3()
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:1713 +0x39
Goroutine 4970 (running) created at:
github.com/elastic/apm-queue/kafka.TestMultipleConsumers()
/home/runner/work/apm-queue/apm-queue/kafka/consumer_test.go:541 +0x6be
testing.tRunner()
/opt/hostedtoolcache/go/1.20.4/x64/src/testing/testing.go:1576 +0x216
testing.(*T).Run.func1()
/opt/hostedtoolcache/go/1.20.4/x64/src/testing/testing.go:1629 +0x47
==================
panic: send on closed channel
goroutine 4968 [running]:
github.com/elastic/apm-queue/kafka.(*Consumer).fetch.func2({{0xc0002dd04b, 0x5}, {0x1, {0x0, 0x0}, 0x168, 0x168, 0x0, {0xc000318010, 0x1, ...}}})
/home/runner/work/apm-queue/apm-queue/kafka/consumer.go:237 +0x225
github.com/twmb/franz-go/pkg/kgo.Fetches.EachPartition({0xc0004d35d8, 0x1, 0xc00047a8c0?}, 0xc000243d80)
/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/record_and_fetch.go:520 +0x235
github.com/elastic/apm-queue/kafka.(*Consumer).fetch(0xc00035c360, {0xf2b9c8, 0xc00047a8c0})
/home/runner/work/apm-queue/apm-queue/kafka/consumer.go:232 +0x5f6
github.com/elastic/apm-queue/kafka.(*Consumer).Run(0xc00035c360, {0xf2b9c8, 0xc00047a8c0})
/home/runner/work/apm-queue/apm-queue/kafka/consumer.go:185 +0xa5
github.com/elastic/apm-queue/kafka.TestMultipleConsumers.func2()
/home/runner/work/apm-queue/apm-queue/kafka/consumer_test.go:543 +0xe5
created by github.com/elastic/apm-queue/kafka.TestMultipleConsumers
/home/runner/work/apm-queue/apm-queue/kafka/consumer_test.go:541 +0x6bf
FAIL github.com/elastic/apm-queue/kafka 1.027s
marclop commented
I didn't close this issue with the linked PR, but I think it should be resolved now.
axw commented
👍 I'll close this and we can reopen if we see it again