rabbitmq/amqp091-go

#31 resurfacing (?)

jxsl13 opened this issue · 9 comments

Hi,

I'm currently building a derived library with reconnect handling and my tests fail due to the already closed issue:

#31

In that issue tests were fixed with t.Cleanup but the actual data race was, as far as I can see, never tackled (?)
Where the assignment to allocator does not seem to be threadsafe (goroutinesafe) resulting in data race detections.

Code lines:
https://github.com/rabbitmq/amqp091-go/blob/main/connection.go#L555

https://github.com/rabbitmq/amqp091-go/blob/main/connection.go#L977

entrypoint for asynchronous memory access (both routines access c.allocator:
https://github.com/rabbitmq/amqp091-go/blob/main/connection.go#L268-L269

Do I do anything incorrectly?
(test case, which is highly dependent on the go runtime scheduler thus not always triggering the data race: https://github.com/jxsl13/amqpx/blob/batch-handler/pool/connection_test.go#L82)

Any feedback is appreciated.
Thanks for your time.

==================
WARNING: DATA RACE
Write at 0x00c0003e7450 by goroutine 56:
  github.com/rabbitmq/amqp091-go.(*Connection).openComplete()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.1/connection.go:977 +0x150
  github.com/rabbitmq/amqp091-go.(*Connection).openVhost()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.1/connection.go:962 +0x1c9
  github.com/rabbitmq/amqp091-go.(*Connection).openTune()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.1/connection.go:948 +0xc66
  github.com/rabbitmq/amqp091-go.(*Connection).openStart()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.1/connection.go:885 +0x66f
  github.com/rabbitmq/amqp091-go.(*Connection).open()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.1/connection.go:857 +0xc4
  github.com/rabbitmq/amqp091-go.Open()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.1/connection.go:269 +0x664
  github.com/rabbitmq/amqp091-go.DialConfig()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.1/connection.go:250 +0x904
  github.com/jxsl13/amqpx/pool.(*Connection).connect()
      /home/jxsl13/Development/amqpx/pool/connection.go:188 +0x33a
  github.com/jxsl13/amqpx/pool.(*Connection).Connect()
      /home/jxsl13/Development/amqpx/pool/connection.go:176 +0x90
  github.com/jxsl13/amqpx/pool.NewConnection()
      /home/jxsl13/Development/amqpx/pool/connection.go:98 +0x999
  github.com/jxsl13/amqpx/pool_test.TestNewConnectionDisconnect.func1()
      /home/jxsl13/Development/amqpx/pool/connection_test.go:97 +0x26b
  github.com/jxsl13/amqpx/pool_test.TestNewConnectionDisconnect.func2()
      /home/jxsl13/Development/amqpx/pool/connection_test.go:115 +0x47

Previous write at 0x00c0003e7450 by goroutine 335:
  github.com/rabbitmq/amqp091-go.(*Connection).shutdown.func1()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.1/connection.go:555 +0x4f9
  sync.(*Once).doSlow()
      /home/jxsl13/sdk/go1.19/src/sync/once.go:74 +0x101
  sync.(*Once).Do()
      /home/jxsl13/sdk/go1.19/src/sync/once.go:65 +0x46
  github.com/rabbitmq/amqp091-go.(*Connection).shutdown()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.1/connection.go:522 +0x84
  github.com/rabbitmq/amqp091-go.(*Connection).reader()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.1/connection.go:662 +0x444
  github.com/rabbitmq/amqp091-go.Open.func1()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.1/connection.go:268 +0x58

Goroutine 56 (running) created at:
  github.com/jxsl13/amqpx/pool_test.TestNewConnectionDisconnect()
      /home/jxsl13/Development/amqpx/pool/connection_test.go:94 +0xee
  testing.tRunner()
      /home/jxsl13/sdk/go1.19/src/testing/testing.go:1446 +0x216
  testing.(*T).Run.func1()
      /home/jxsl13/sdk/go1.19/src/testing/testing.go:1493 +0x47

Goroutine 335 (finished) created at:
  github.com/rabbitmq/amqp091-go.Open()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.1/connection.go:268 +0x629
  github.com/rabbitmq/amqp091-go.DialConfig()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.1/connection.go:250 +0x904
  github.com/jxsl13/amqpx/pool.(*Connection).connect()
      /home/jxsl13/Development/amqpx/pool/connection.go:188 +0x33a
  github.com/jxsl13/amqpx/pool.(*Connection).Connect()
      /home/jxsl13/Development/amqpx/pool/connection.go:176 +0x90
  github.com/jxsl13/amqpx/pool.NewConnection()
      /home/jxsl13/Development/amqpx/pool/connection.go:98 +0x999
  github.com/jxsl13/amqpx/pool_test.TestNewConnectionDisconnect.func1()
      /home/jxsl13/Development/amqpx/pool/connection_test.go:97 +0x26b
  github.com/jxsl13/amqpx/pool_test.TestNewConnectionDisconnect.func2()
      /home/jxsl13/Development/amqpx/pool/connection_test.go:115 +0x47

Your test is triggering quite an edge case, where the background frame reader receives an error and returns, before Open even completes. I agree it should not race.

I've pushed a fix in commit c503330, also in branch issue-170. Can you validate this fix before I submit a PR?

I think you will have to also add locking around https://github.com/rabbitmq/amqp091-go/blob/main/connection.go#L555
this one, as the sync.Once is a different mutex than c.m

I don't know the lifetime of the c.allocator object in order to see whether there are any other places where it is accessed across goroutines. In such a case it might be good to have a single or multiple helper methods that lock access to the allocator, do whatever needs to be done on the allocator and then return the result.

The mutex c.m in the destructor is the same mutex as in c.m in openComplete; in both cases, we access a field in the Connection struct, which is the pointer receiver of Connection. The destructor is already locked as shown here:

amqp091-go/connection.go

Lines 522 to 524 in c503330

c.destructor.Do(func() {
c.m.Lock()
defer c.m.Unlock()

I see, my bad. Will test it later (Y)

@Zerpet looks good. I re-ran my test 20 times and could not reproduce.

Edit:
Ran another 20 times and the test failed... will look into it..

==================
WARNING: DATA RACE
Write at 0x00c0003b80c8 by goroutine 112:
  github.com/rabbitmq/amqp091-go.(*Connection).openTune()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.2-0.20230207111221-c503330cfb10/connection.go:917 +0x6c4
  github.com/rabbitmq/amqp091-go.(*Connection).openStart()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.2-0.20230207111221-c503330cfb10/connection.go:885 +0x6af
  github.com/rabbitmq/amqp091-go.(*Connection).open()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.2-0.20230207111221-c503330cfb10/connection.go:857 +0xc4
  github.com/rabbitmq/amqp091-go.Open()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.2-0.20230207111221-c503330cfb10/connection.go:269 +0x664
  github.com/rabbitmq/amqp091-go.DialConfig()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.2-0.20230207111221-c503330cfb10/connection.go:250 +0x904
  github.com/jxsl13/amqpx/pool.(*Connection).connect()
      /home/jxsl13/Development/amqpx/pool/connection.go:188 +0x337
  github.com/jxsl13/amqpx/pool.(*Connection).Connect()
      /home/jxsl13/Development/amqpx/pool/connection.go:176 +0x90
  github.com/jxsl13/amqpx/pool.NewConnection()
      /home/jxsl13/Development/amqpx/pool/connection.go:98 +0x999
  github.com/jxsl13/amqpx/pool_test.TestNewConnectionDisconnect.func1()
      /home/jxsl13/Development/amqpx/pool/connection_test.go:97 +0x11b
  github.com/jxsl13/amqpx/pool_test.TestNewConnectionDisconnect.func2()
      /home/jxsl13/Development/amqpx/pool/connection_test.go:115 +0x47

Previous read at 0x00c0003b80c8 by goroutine 138:
  github.com/rabbitmq/amqp091-go.(*Connection).shutdown.func1()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.2-0.20230207111221-c503330cfb10/connection.go:555 +0x413
  sync.(*Once).doSlow()
      /usr/lib/go-1.20/src/sync/once.go:74 +0x101
  sync.(*Once).Do()
      /usr/lib/go-1.20/src/sync/once.go:65 +0x46
  github.com/rabbitmq/amqp091-go.(*Connection).shutdown()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.2-0.20230207111221-c503330cfb10/connection.go:522 +0x84
  github.com/rabbitmq/amqp091-go.(*Connection).reader()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.2-0.20230207111221-c503330cfb10/connection.go:662 +0x435
  github.com/rabbitmq/amqp091-go.Open.func1()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.2-0.20230207111221-c503330cfb10/connection.go:268 +0x58

Goroutine 112 (running) created at:
  github.com/jxsl13/amqpx/pool_test.TestNewConnectionDisconnect()
      /home/jxsl13/Development/amqpx/pool/connection_test.go:94 +0xe8
  testing.tRunner()
      /usr/lib/go-1.20/src/testing/testing.go:1576 +0x216
  testing.(*T).Run.func1()
      /usr/lib/go-1.20/src/testing/testing.go:1629 +0x47

Goroutine 138 (finished) created at:
  github.com/rabbitmq/amqp091-go.Open()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.2-0.20230207111221-c503330cfb10/connection.go:268 +0x629
  github.com/rabbitmq/amqp091-go.DialConfig()
      /home/jxsl13/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.6.2-0.20230207111221-c503330cfb10/connection.go:250 +0x904
  github.com/jxsl13/amqpx/pool.(*Connection).connect()
      /home/jxsl13/Development/amqpx/pool/connection.go:188 +0x337
  github.com/jxsl13/amqpx/pool.(*Connection).Connect()
      /home/jxsl13/Development/amqpx/pool/connection.go:176 +0x90
  github.com/jxsl13/amqpx/pool.NewConnection()
      /home/jxsl13/Development/amqpx/pool/connection.go:98 +0x999
  github.com/jxsl13/amqpx/pool_test.TestNewConnectionDisconnect.func1()
      /home/jxsl13/Development/amqpx/pool/connection_test.go:97 +0x11b
  github.com/jxsl13/amqpx/pool_test.TestNewConnectionDisconnect.func2()
      /home/jxsl13/Development/amqpx/pool/connection_test.go:115 +0x47
==================
    toxiproxy_client_test.go:125: time=2023-02-07 18:31:32.470490147 +0100 CET m=+1.375569022 level=debug, msg=enabled rabbitmq connection, 
    testing.go:1446: race detected during execution of test
--- FAIL: TestNewConnectionDisconnect (34.14s)
=== NAME  
    testing.go:1446: race detected during execution of test

In order to reproduce:

git clone https://github.com/jxsl13/amqpx.git
cd amqpx
git checkout -b batch-handler

# requires docker & docker-compose (starts a rabbitmq & toxy-prtoxy container)
make environment

# optionally you may update your branch dependency
go get github.com/rabbitmq/amqp091-go@issue-170

go test -timeout 900s -run ^TestNewConnectionDisconnect$ github.com/jxsl13/amqpx/pool -v -race -count=40

I've run the tests 3 times with -count=40 and I can't reproduce the issue. I've pushed a small fix 0ecb414 to the branch, adding a critical section in tune to protect the access to ChannelMax. Can you try out this new patch?

I also cannot reproduce it with the new version.
I guess that should fix this issue.
Thanks for the fast respone.