uber-go/cadence-client

Goroutine leak after Worker.Stop()

asido opened this issue · 2 comments

asido commented

Describe the bug
Worker.Stop() doesn't await for goroutines to stop, which goleak identifies as a goroutine leak.

To Reproduce
Is the issue reproducible?

  • Yes

Steps to reproduce the behavior:

go.mod:

module cadence-goleak

go 1.16

require (
	go.uber.org/cadence v0.17.0
	go.uber.org/fx v1.14.2
	go.uber.org/goleak v1.1.10
	go.uber.org/yarpc v1.57.1
)

main.go:

package main

import (
	"context"

	"go.uber.org/cadence/.gen/go/cadence/workflowserviceclient"
	"go.uber.org/cadence/worker"
	"go.uber.org/cadence/workflow"
	"go.uber.org/fx"
	"go.uber.org/yarpc/api/transport"
	"go.uber.org/yarpc/yarpctest"
)

var opts = fx.Options(
	fx.Provide(newCadenceWorker),
	fx.Invoke(registerCadenceWorker),
)

func main() {
	fx.New(opts).Run()
}

func newCadenceWorker() worker.Worker {
	serviceClient := workflowserviceclient.New(&transport.OutboundConfig{
		CallerName: "caller-name",
		Outbounds: transport.Outbounds{
			ServiceName: "service-name",
			Unary:       yarpctest.NewFakeTransport().NewOutbound(yarpctest.NewFakePeerList()),
		},
	})

	w := worker.New(serviceClient, "cadence-domain", "task-list", worker.Options{})
	w.RegisterWorkflowWithOptions(noopWorkflow, workflow.RegisterOptions{Name: "noop-workflow"})
	return w
}

func registerCadenceWorker(worker worker.Worker, lifecycle fx.Lifecycle) {
	lifecycle.Append(fx.Hook{
		OnStart: func(context.Context) error {
			return worker.Start()
		},
		OnStop: func(context.Context) error {
			worker.Stop()
			return nil
		},
	})
}

func noopWorkflow(ctx workflow.Context) error {
	return nil
}

main_test.go:

package main

import (
	"testing"
	"time"

	"go.uber.org/fx/fxtest"
	"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
	goleak.VerifyTestMain(m)
}

func Test_app_runs(t *testing.T) {
	app := fxtest.New(t, opts)
	app.RequireStart()
	// Give time for worker polling to begin.
	time.Sleep(time.Second)
	app.RequireStop()
}
$ go test .
goleak: Errors on successful test run: found unexpected goroutines:
[Goroutine 24 in state sleep, with time.Sleep on top of the stack:
goroutine 24 [sleep]:
time.Sleep(0x2035a97a4)
	/usr/local/Cellar/go/1.16.6/libexec/src/runtime/time.go:193 +0xd2
go.uber.org/cadence/internal/common/backoff.(*ConcurrentRetrier).throttleInternal(0xc000160aa0, 0xc00007cd20)
	/Users/asido/go/pkg/mod/go.uber.org/cadence@v0.17.0/internal/common/backoff/retry.go:62 +0x7e
go.uber.org/cadence/internal/common/backoff.(*ConcurrentRetrier).Throttle(...)
	/Users/asido/go/pkg/mod/go.uber.org/cadence@v0.17.0/internal/common/backoff/retry.go:48
go.uber.org/cadence/internal.(*baseWorker).pollTask(0xc0003581c0)
	/Users/asido/go/pkg/mod/go.uber.org/cadence@v0.17.0/internal/internal_worker_base.go:261 +0x4a
go.uber.org/cadence/internal.(*baseWorker).runPoller(0xc0003581c0)
	/Users/asido/go/pkg/mod/go.uber.org/cadence@v0.17.0/internal/internal_worker_base.go:227 +0xb5
created by go.uber.org/cadence/internal.(*baseWorker).Start
	/Users/asido/go/pkg/mod/go.uber.org/cadence@v0.17.0/internal/internal_worker_base.go:190 +0xbb

 Goroutine 13 in state semacquire, with sync.runtime_Semacquire on top of the stack:
goroutine 13 [semacquire]:
sync.runtime_Semacquire(0xc000358238)
	/usr/local/Cellar/go/1.16.6/libexec/src/runtime/sema.go:56 +0x45
sync.(*WaitGroup).Wait(0xc000358230)
	/usr/local/Cellar/go/1.16.6/libexec/src/sync/waitgroup.go:130 +0x65
go.uber.org/cadence/internal/common/util.AwaitWaitGroup.func1(0xc000358230, 0xc0000b2360)
	/Users/asido/go/pkg/mod/go.uber.org/cadence@v0.17.0/internal/common/util/util.go:52 +0x2b
created by go.uber.org/cadence/internal/common/util.AwaitWaitGroup
	/Users/asido/go/pkg/mod/go.uber.org/cadence@v0.17.0/internal/common/util/util.go:51 +0x6e
]
FAIL	cadence-goleak	10.567s

Expected behavior

$ go test .
ok  	cadence-goleak

Additional Context
Used cadence-client v0.17.0, because with the program above v0.18.2 download fails. But the reported problem is in both releases.

$ go get .
# go.uber.org/cadence/internal/common
../../../go/pkg/mod/go.uber.org/cadence@v0.18.2/internal/common/thrift_util.go:31:38: not enough arguments in call to thrift.NewTSerializer().Write
	have (thrift.TStruct)
	want (context.Context, thrift.TStruct)
../../../go/pkg/mod/go.uber.org/cadence@v0.18.2/internal/common/thrift_util.go:53:27: not enough arguments in call to t.Protocol.Flush
	have ()
	want (context.Context)
../../../go/pkg/mod/go.uber.org/cadence@v0.18.2/internal/common/thrift_util.go:57:28: not enough arguments in call to t.Transport.Flush
	have ()
	want (context.Context)
Groxx commented

Yeah, AFAICT the shutdown process just closes a channel and returns :| so there's no way to wait for a clean shutdown.

There are a few other shutdown leaks throughout the client, unfortunately they're definitely not all simple fixes.


As far as this error:

$ go get .
# go.uber.org/cadence/internal/common
../../../go/pkg/mod/go.uber.org/cadence@v0.18.2/internal/common/thrift_util.go:31:38: not enough arguments in call to thrift.NewTSerializer().Write
	have (thrift.TStruct)
	want (context.Context, thrift.TStruct)
../../../go/pkg/mod/go.uber.org/cadence@v0.18.2/internal/common/thrift_util.go:53:27: not enough arguments in call to t.Protocol.Flush
	have ()
	want (context.Context)
../../../go/pkg/mod/go.uber.org/cadence@v0.18.2/internal/common/thrift_util.go:57:28: not enough arguments in call to t.Transport.Flush
	have ()
	want (context.Context)

^ that means you already have a version of thrift that's too new for cadence. They made a breaking change without releasing a new version, and go modules do not allow specifying upper limits, so unfortunately you need to downgrade it to something compatible. If you do a "clean" go get, it works, because that downloads the minimum versions we specify in our go.mod file.