elastic/apm-queue

kafka: data race accessing/updating partition consumers

Closed this issue · 3 comments

axw commented

https://github.com/elastic/apm-queue/actions/runs/5183467517/jobs/9341397490?pr=172

=== RUN   TestMultipleConsumers
==================
WARNING: DATA RACE
Write at 0x00c0002cd920 by goroutine 12237:
  runtime.mapdelete()
      /opt/hostedtoolcache/go/1.20.4/x64/src/runtime/map.go:695 +0x0
  github.com/elastic/apm-queue/kafka.(*consumer).lost()
      /home/runner/work/apm-queue/apm-queue/kafka/consumer.go:329 +0x2ca
  github.com/elastic/apm-queue/kafka.(*consumer).lost-fm()
      <autogenerated>:1 +0x6d
  github.com/twmb/franz-go/pkg/kgo.(*consumer).initGroup.func2()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:266 +0x54e
  github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).revoke()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:636 +0x1043
  github.com/twmb/franz-go/pkg/kgo.(*assignRevokeSession).prerevoke.func1()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:696 +0xe4

Previous read at 0x00c0002cd920 by goroutine 4970:
  runtime.mapaccess1()
      /opt/hostedtoolcache/go/1.20.4/x64/src/runtime/map.go:395 +0x0
  github.com/elastic/apm-queue/kafka.(*Consumer).fetch.func2()
      /home/runner/work/apm-queue/apm-queue/kafka/consumer.go:238 +0x104
  github.com/twmb/franz-go/pkg/kgo.Fetches.EachPartition()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/record_and_fetch.go:520 +0x234
  github.com/elastic/apm-queue/kafka.(*Consumer).fetch()
      /home/runner/work/apm-queue/apm-queue/kafka/consumer.go:232 +0x5f5
  github.com/elastic/apm-queue/kafka.(*Consumer).Run()
      /home/runner/work/apm-queue/apm-queue/kafka/consumer.go:185 +0xa4
  github.com/elastic/apm-queue/kafka.TestMultipleConsumers.func2()
      /home/runner/work/apm-queue/apm-queue/kafka/consumer_test.go:543 +0xe4

Goroutine 12237 (running) created at:
  github.com/twmb/franz-go/pkg/kgo.(*assignRevokeSession).prerevoke()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:693 +0x11a
  github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:760 +0x50b
  github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).manage()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:315 +0x2c9
  github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).findNewAssignments.func3()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:1713 +0x39

Goroutine 4970 (running) created at:
  github.com/elastic/apm-queue/kafka.TestMultipleConsumers()
      /home/runner/work/apm-queue/apm-queue/kafka/consumer_test.go:541 +0x6be
  testing.tRunner()
      /opt/hostedtoolcache/go/1.20.4/x64/src/testing/testing.go:[1576](https://github.com/elastic/apm-queue/actions/runs/5183467517/jobs/9341397490?pr=172#step:4:1577) +0x216
  testing.(*T).Run.func1()
      /opt/hostedtoolcache/go/1.20.4/x64/src/testing/testing.go:1629 +0x47
==================
==================
WARNING: DATA RACE
Write at 0x00c0004b4430 by goroutine 12237:
  runtime.closechan()
      /opt/hostedtoolcache/go/1.20.4/x64/src/runtime/chan.go:357 +0x0
  github.com/elastic/apm-queue/kafka.(*consumer).lost()
      /home/runner/work/apm-queue/apm-queue/kafka/consumer.go:330 +0x2d7
  github.com/elastic/apm-queue/kafka.(*consumer).lost-fm()
      <autogenerated>:1 +0x6d
  github.com/twmb/franz-go/pkg/kgo.(*consumer).initGroup.func2()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:266 +0x54e
  github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).revoke()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:636 +0x1043
  github.com/twmb/franz-go/pkg/kgo.(*assignRevokeSession).prerevoke.func1()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:696 +0xe4

Previous read at 0x00c0004b4430 by goroutine 4970:
  runtime.chansend()
      /opt/hostedtoolcache/go/1.20.4/x64/src/runtime/chan.go:160 +0x0
  github.com/elastic/apm-queue/kafka.(*Consumer).fetch.func2()
      /home/runner/work/apm-queue/apm-queue/kafka/consumer.go:238 +0x1e4
  github.com/twmb/franz-go/pkg/kgo.Fetches.EachPartition()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/record_and_fetch.go:520 +0x234
  github.com/elastic/apm-queue/kafka.(*Consumer).fetch()
      /home/runner/work/apm-queue/apm-queue/kafka/consumer.go:232 +0x5f5
  github.com/elastic/apm-queue/kafka.(*Consumer).Run()
      /home/runner/work/apm-queue/apm-queue/kafka/consumer.go:185 +0xa4
  github.com/elastic/apm-queue/kafka.TestMultipleConsumers.func2()
      /home/runner/work/apm-queue/apm-queue/kafka/consumer_test.go:543 +0xe4

Goroutine 12237 (running) created at:
  github.com/twmb/franz-go/pkg/kgo.(*assignRevokeSession).prerevoke()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:693 +0x11a
  github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).setupAssignedAndHeartbeat()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:760 +0x50b
  github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).manage()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:315 +0x2c9
  github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).findNewAssignments.func3()
      /home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/consumer_group.go:1713 +0x39

Goroutine 4970 (running) created at:
  github.com/elastic/apm-queue/kafka.TestMultipleConsumers()
      /home/runner/work/apm-queue/apm-queue/kafka/consumer_test.go:541 +0x6be
  testing.tRunner()
      /opt/hostedtoolcache/go/1.20.4/x64/src/testing/testing.go:1576 +0x216
  testing.(*T).Run.func1()
      /opt/hostedtoolcache/go/1.20.4/x64/src/testing/testing.go:1629 +0x47
==================
panic: send on closed channel

goroutine 4968 [running]:
github.com/elastic/apm-queue/kafka.(*Consumer).fetch.func2({{0xc0002dd04b, 0x5}, {0x1, {0x0, 0x0}, 0x168, 0x168, 0x0, {0xc000318010, 0x1, ...}}})
	/home/runner/work/apm-queue/apm-queue/kafka/consumer.go:237 +0x225
github.com/twmb/franz-go/pkg/kgo.Fetches.EachPartition({0xc0004d35d8, 0x1, 0xc00047a8c0?}, 0xc000243d80)
	/home/runner/go/pkg/mod/github.com/twmb/franz-go@v1.13.5/pkg/kgo/record_and_fetch.go:520 +0x235
github.com/elastic/apm-queue/kafka.(*Consumer).fetch(0xc00035c360, {0xf2b9c8, 0xc00047a8c0})
	/home/runner/work/apm-queue/apm-queue/kafka/consumer.go:232 +0x5f6
github.com/elastic/apm-queue/kafka.(*Consumer).Run(0xc00035c360, {0xf2b9c8, 0xc00047a8c0})
	/home/runner/work/apm-queue/apm-queue/kafka/consumer.go:185 +0xa5
github.com/elastic/apm-queue/kafka.TestMultipleConsumers.func2()
	/home/runner/work/apm-queue/apm-queue/kafka/consumer_test.go:543 +0xe5
created by github.com/elastic/apm-queue/kafka.TestMultipleConsumers
	/home/runner/work/apm-queue/apm-queue/kafka/consumer_test.go:541 +0x6bf
FAIL	github.com/elastic/apm-queue/kafka	1.027s
axw commented

I think this may be fixed by #178

I didn't close this issue with the linked PR, but I think it should be resolved now.

axw commented

👍 I'll close this and we can reopen if we see it again