riverqueue/river

Memory leak?

shawnstephens opened this issue Β· 14 comments

We are trialing River in production. We have a consumer process that consumes a message from Kafka and inserts a River job. We've been running with 1 River worker process (MaxWorkers = 4) that runs separately from the consumer process that has an insert-only client. I've observed memory increasing through relative low volume periods, and it increases quickly during high volume of 30+ messages per second. The dip near the end of the chart is a restart. You can see memory increasing during a relatively low volume period during the overnight hours.

image

image

I'll paste some of the code below:

Insert-only Client creation:

riverClient, err := river.NewClient(riverpgxv5.New(ds.PgxPool()), &river.Config{})
if err != nil {
	log.Errorf("could not initialize new River client: %s", err.Error())
	panic(err)
}

River job insertion

_, err := riverClient.Insert(ctx, ShipmentMessage{JSONString: string(message.Value)}, nil)
if err != nil {
	log.Errorf("failed to insert job: %s", err.Error())
} else {
	metrics.Count("shipment_message_worker.message_inserted", 1, []string{}, 1)
}

The worker (this process is where the memory may be leaking)

func (ShipmentMessage) Kind() string { return "shipment_message" }

func (rw *ShipmentMessageWorker) Work(ctx context.Context, job *river.Job[ShipmentMessage]) error {
	metrics.Count("shipment_message_worker.message_processed", 1, []string{}, 1)

	return nil
}

Worker initialization:

func InitRiverWorkerAndClient(ctx context.Context, ds *postgres.Datastore, cfg Config) {
	// add river worker
	riverWorkers := river.NewWorkers()
	if err := river.AddWorkerSafely(riverWorkers, &ShipmentMessageWorker{
		Logger: log.New(),
	}); err != nil {
		log.Errorf("could not register new River worker: %s", err.Error())
		panic(err)
	}
	log.Infof("new River worker registered successfully")

	// create new river client
	client, err := river.NewClient(riverpgxv5.New(ds.PgxPool()), &river.Config{
                // cfg.MaxRiverWorkers is 4
		Queues:  map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: cfg.MaxRiverWorkers}},
		Workers: riverWorkers,
	})
	if err != nil {
		log.Errorf("could not initialize new River client: %s", err.Error())
		panic(err)
	}

	// start the client
	if err := client.Start(ctx); err != nil {
		log.Errorf("failed to start River client: %s", err.Error())
		panic(err)
	}

	// ensures the worker doesn't exit until it's terminated by a signal
	<-ctx.Done()
       // Note we use context cancelation to stop the worker on shutdown.
}

Hi @shawnstephens, thanks for reporting! πŸ™ I'm digging into this now. One thing is clearβ€”we don't appear to have any goroutine leaks! πŸ˜† But there do seem to be memory leaks. I have a local demo river app which I set up to work through many jobs per second and the memory is slowly climbing as it works through tens of thousands of jobs.

First look at a heap profile from many minutes in:

➜  ~   go tool pprof -alloc_space http://localhost:6060/debug/pprof/heap

Fetching profile over HTTP from http://localhost:6060/debug/pprof/heap
Saved profile in /Users/bgentry/pprof/pprof.riverdemo.alloc_objects.alloc_space.inuse_objects.inuse_space.007.pb.gz
File: riverdemo
Type: alloc_space
Time: Feb 28, 2024 at 8:10pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 930.17MB, 63.25% of 1470.67MB total
Dropped 128 nodes (cum <= 7.35MB)
Showing top 10 nodes out of 104
      flat  flat%   sum%        cum   cum%
  417.13MB 28.36% 28.36%   417.13MB 28.36%  math/rand.newSource
  123.20MB  8.38% 36.74%   123.20MB  8.38%  runtime/debug.Stack
   64.51MB  4.39% 41.13%   427.81MB 29.09%  github.com/riverqueue/river/internal/dbsqlc.(*Queries).JobSetStateIfRunning
   56.05MB  3.81% 44.94%   108.10MB  7.35%  github.com/riverqueue/river.(*jobExecutor).reportError
   48.07MB  3.27% 48.21%    48.07MB  3.27%  github.com/jackc/pgx/v5/pgproto3.(*Bind).Encode
   46.01MB  3.13% 51.34%    47.51MB  3.23%  github.com/riverqueue/river/internal/dbsqlc.JobRowFromInternal
   45.05MB  3.06% 54.40%    45.05MB  3.06%  github.com/jackc/pgx/v5/pgtype.encodePlanJSONCodecEitherFormatByteSlice.Encode
   44.05MB  3.00% 57.39%    87.10MB  5.92%  encoding/json.(*decodeState).literalStore
   43.05MB  2.93% 60.32%    43.05MB  2.93%  encoding/json.unquoteBytes
   43.05MB  2.93% 63.25%    46.05MB  3.13%  encoding/json.Marshal
(pprof) 

The obvious standout there is the first entry. I dropped a quick log line into here, and it became clear that we are frequently allocating new rand sources:

func NewCryptoSeededConcurrentSafeRand() *mathrand.Rand {
return mathrand.New(newCryptoSeededConcurrentSafeSource())
}

The fix for that one is in #240. However that doesn't appear to be the only leak. After fixing it and running again for awhile, I'm still seeing memory growing (more) slowly. Another profile:

➜  ~   go tool pprof -alloc_space http://localhost:6060/debug/pprof/heap

Fetching profile over HTTP from http://localhost:6060/debug/pprof/heap
Saved profile in /Users/bgentry/pprof/pprof.riverdemo.alloc_objects.alloc_space.inuse_objects.inuse_space.012.pb.gz
File: riverdemo
Type: alloc_space
Time: Feb 28, 2024 at 8:44pm (CST)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) top
Showing nodes accounting for 344.35MB, 50.90% of 676.54MB total
Dropped 129 nodes (cum <= 3.38MB)
Showing top 10 nodes out of 116
      flat  flat%   sum%        cum   cum%
   67.11MB  9.92%  9.92%    67.11MB  9.92%  runtime/debug.Stack
   41.06MB  6.07% 15.99%    41.06MB  6.07%  github.com/jackc/pgx/v5/pgproto3.(*Bind).Encode
   40.04MB  5.92% 21.91%    69.56MB 10.28%  github.com/riverqueue/river.(*jobExecutor).reportError
   35.01MB  5.17% 27.08%   276.21MB 40.83%  github.com/riverqueue/river/internal/dbsqlc.(*Queries).JobSetStateIfRunning
   31.04MB  4.59% 31.67%    31.04MB  4.59%  github.com/jackc/pgx/v5/pgtype.encodePlanJSONCodecEitherFormatByteSlice.Encode
   30.53MB  4.51% 36.18%    30.53MB  4.51%  encoding/json.unquoteBytes
   27.03MB  3.99% 40.18%    27.03MB  3.99%  github.com/jackc/pgx/v5/pgproto3.(*Frontend).Flush
   24.53MB  3.63% 43.80%    27.03MB  4.00%  encoding/json.Marshal
   24.51MB  3.62% 47.43%    32.01MB  4.73%  github.com/riverqueue/river/internal/dbsqlc.(*Queries).JobGetAvailable
   23.50MB  3.47% 50.90%    28.07MB  4.15%  github.com/riverqueue/river.insertParamsFromArgsAndOptions

In particular, JobSetStateIfRunning and reportError stand out to me in there. I'll look into those now.

Thanks so much @bgentry for diving into this and providing the detailed responses.

Once a new release is ready, I can update our internal application and deploy for testing.

Thanks @shawnstephens!

I've been doing a little more testing this morning β€” the memory leak that Blake fixed last night is by far the worst offender, but we still have at least one more that I'm tracking down. It leaks much more slowly, but still, steadily. Based on what I've been able to glean from pprof so far, I suspect it's a leaked context.

I cut v0.0.23 just so we have something out there that's not totally implausible, but work continues on this one.

@brandur πŸ₯‡ . Thanks for the additional context. I'll put up a PR to try v0.0.23.

@brandur looks like rivertype needs its own tag now?

➜  go get -u github.com/riverqueue/river          
go: downloading github.com/riverqueue/river/rivertype v0.0.23
go: github.com/riverqueue/river imports
        github.com/riverqueue/river/rivertype: reading github.com/riverqueue/river/rivertype/go.mod at revision rivertype/v0.0.23: unknown revision rivertype/v0.0.23

@shawnstephens Quite right! Pushed.

@brandur Thank you! I had to revert to v0.0.22 unfortunately. I was seeing this error on every single execution.

time=2024-02-29T17:50:21.050Z level=ERROR msg="error establishing connection from pool" notifier.err="connection already established"

CPU usage spiked:
image

@shawnstephens Thanks. We found what we think was our last memory leak and pushed v0.0.24 to patch it, although this isn't likely to resolve the connection issue that you mentioned above. I'll take a look at that separately.

@shawnstephens I think I know the cause of the problem, which is that this error occurs when a client's Start is invoked more than once. e.g. (Although it's probably not as obvious as this.)

	if err := riverClient.Start(ctx); err != nil {
		return err
	}

	if err := riverClient.Start(ctx); err != nil {
		return err
	}

The client should probably have some protection against that, but for now, can you audit your use of Start and see if anything comes up?

I've run into the same issue with v0.0.24 and I'm starting the client only once.

In my case, it looks like a DB connection runs into a timeout, resulting in the worker going into a spin loop, repeatedly logging the connection already established message (second one below). The issue appears a few minutes after starting the worker.

The timeout might well be an issue with the environment the worker is running in, nevertheless I'd want river to be able to recover from this.

First two relevant log messages (they show the an earlier commit version, but I've reproduced the issue with v0.0.24 as well):

{
    "time": "2024-03-01T08:18:22.025949193Z",
    "level": "ERROR",
    "msg": "error closing listener",
    "additional_values": "notifier.err=tls: failed to send closeNotify alert (but connection was closed anyway): write tcp LOCAL_IP:41982->POSTGRES_IP:15433: i/o timeout",
    "source":
    {
        "function": "github.com/riverqueue/river/internal/notifier.(*Notifier).getConnAndRun.func1",
        "file": "/go/pkg/mod/github.com/riverqueue/river@v0.0.23-0.20240226152235-b2cb1422b4b5/internal/notifier/notifier.go",
        "line": 140
    }
}
{
    "time": "2024-03-01T08:18:22.026028811Z",
    "level": "ERROR",
    "msg": "error establishing connection from pool",
    "additional_values": "notifier.err=connection already established",
    "source":
    {
        "function": "github.com/riverqueue/river/internal/notifier.(*Notifier).getConnAndRun",
        "file": "/go/pkg/mod/github.com/riverqueue/river@v0.0.23-0.20240226152235-b2cb1422b4b5/internal/notifier/notifier.go",
        "line": 130
    }
}

I suspect the issue lies between riverpgxv5.Listener.Close() not setting l.conn to nil in case closing the connection fails and Listener.Connect() returning the "connection already established" error if l.conn is not nil.

Reading PgConn's Close() docs, they sound like it might be ok to assume that the connection is closed even if Close() returns an error. So the solution might be as simple as setting l.Conn = nil in the riverpgxv5 Listener regardless whether pgx returned an error on Close().

@mfrister Great point. That's very likely it. Opened #246 to patch that problem up.

@shawnstephens I think I know the cause of the problem, which is that this error occurs when a client's Start is invoked more than once. e.g. (Although it's probably not as obvious as this.)

	if err := riverClient.Start(ctx); err != nil {
		return err
	}

	if err := riverClient.Start(ctx); err != nil {
		return err
	}

The client should probably have some protection against that, but for now, can you audit your use of Start and see if anything comes up?

@brandur First thanks a bunch for all the memory fixes. I confirmed we are only calling start once in the background worker. The function InitRiverWorkerAndClient has only one invocation in the application. We do have a separate process with an insert-only worker, and it does not call start. Sounds like #246 should fix it though. I also show the issue to start a few minutes after start as @mfrister mentions.

func InitRiverWorkerAndClient(ctx context.Context, ds *postgres.Datastore, cfg Config) {
	// add river worker
	riverWorkers := river.NewWorkers()
	if err := river.AddWorkerSafely(riverWorkers, &ShipmentMessageWorker{
		Logger: log.New(),
	}); err != nil {
		log.Errorf("could not register new River worker: %s", err.Error())
		panic(err)
	}
	log.Infof("new River worker registered successfully")

	// create new river client
	client, err := river.NewClient(riverpgxv5.New(ds.PgxPool()), &river.Config{
		Queues:  map[string]river.QueueConfig{river.QueueDefault: {MaxWorkers: cfg.MaxRiverWorkers}},
		Workers: riverWorkers,
	})
	if err != nil {
		log.Errorf("could not initialize new River client: %s", err.Error())
		panic(err)
	}

	// start the client
	if err := client.Start(ctx); err != nil {
		log.Errorf("failed to start River client: %s", err.Error())
		panic(err)
	}

	// ensures the worker doesn't exit until it's terminated by a signal
	<-ctx.Done()
}

@shawnstephens Ack, and thanks for checking into that. We'll cut a release for #246 soon, and hopefully this'll be the last major hurdle.