concurrent dispatch data race
LucaWolf opened this issue · 3 comments
Is your feature request related to a problem? Please describe.
Could you please describe a bit the issue at https://github.com/rabbitmq/amqp091-go/blob/69013f16824c2227d9d10d174713d565d92f13ea/connection.go#L672C64-L672C64
I seem to have stumbled on this case.
Describe the solution you'd like
Suggest how the applications can avoid this case.
Describe alternatives you've considered
No response
Additional context
triggered by ./consumer_batch of example of the https://github.com/LucaWolf/grabbit wrapper library, when using the "isolate_notifiers" branch.
2023/10/10 21:58:31 notification: {Channel chan.publisher.example CannotEstablish Exception (504) Reason: "channel/connection is not open"}
2023/10/10 21:58:31 notification: {Connection conn.main Up no error}
2023/10/10 21:58:39 notification: {Channel chan.publisher.example Up no error}
2023/10/10 21:58:39 notification: {Channel chan.publisher.example workload DefineTopology no error}
WARNING: DATA RACE
Write at 0x00c00014a430 by goroutine 9:
github.com/rabbitmq/amqp091-go.(*Channel).Confirm()
/home/lucian/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:1744 +0xe4
github.com/LucaWolf/grabbit.chanNotifiersRefresh()
/home/lucian/projects/go/grabbit/channel_notifiers.go:38 +0x2dd
github.com/LucaWolf/grabbit.(*Channel).manager()
/home/lucian/projects/go/grabbit/channel.go:112 +0x4a
github.com/LucaWolf/grabbit.NewChannel.func1()
/home/lucian/projects/go/grabbit/channel.go:70 +0x44
Previous read at 0x00c00014a430 by goroutine 14:
github.com/rabbitmq/amqp091-go.(*Channel).dispatch()
/home/lucian/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:351 +0x846
github.com/rabbitmq/amqp091-go.(*Channel).recvMethod()
/home/lucian/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/channel.go:396 +0x1c9
github.com/rabbitmq/amqp091-go.(*Connection).dispatchN()
/home/lucian/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:675 +0x1cb
github.com/rabbitmq/amqp091-go.(*Connection).demux()
/home/lucian/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:618 +0x6e
github.com/rabbitmq/amqp091-go.(*Connection).reader()
/home/lucian/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:733 +0x2e5
github.com/rabbitmq/amqp091-go.Open.func1()
/home/lucian/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:271 +0x58
Goroutine 9 (running) created at:
github.com/LucaWolf/grabbit.NewChannel()
/home/lucian/projects/go/grabbit/channel.go:66 +0x5e4
github.com/LucaWolf/grabbit.NewPublisher()
/home/lucian/projects/go/grabbit/publisher.go:88 +0x204
main.main()
/home/lucian/projects/go/grabbit/examples/consumer_batch/main.go:154 +0x729
Goroutine 14 (running) created at:
github.com/rabbitmq/amqp091-go.Open()
/home/lucian/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:271 +0x6a9
github.com/rabbitmq/amqp091-go.DialConfig()
/home/lucian/go/pkg/mod/github.com/rabbitmq/amqp091-go@v1.9.0/connection.go:252 +0x904
github.com/LucaWolf/grabbit.(*Connection).dial()
/home/lucian/projects/go/grabbit/connection.go:227 +0x117
github.com/LucaWolf/grabbit.(*Connection).reconnectLoop()
/home/lucian/projects/go/grabbit/connection.go:254 +0x137
github.com/LucaWolf/grabbit.NewConnection.func1()
/home/lucian/projects/go/grabbit/connection.go:62 +0xc8
So it turns out my code has two issues:
- the publish notifier channel has lesser capacity than what was published. There is a consumer coroutine but most probably it was still overflowing. Interesting it lead to the reported race
- the notifiers refresh wrongly happens after notification being processed.
Please ignore this post post for now till I fix my side. Nonetheless, would be interesting to get some feedback about the resoning of the initial code comment:
"// Note: this could result in concurrent dispatch depending on
// how channels are managed in an application"
Fixed on app side by proper handling of the NotifyPublish
call (i.e. only once after each channel established/recovered stage).
Sorry for the noise.
@LucaWolf thank you for following up. If I have time I will look into the source code comment you mention.