dgraph-io/dgraph

When dgraph uses OpenCensus to trace GRPC streaming messages such as "pb.zero.oracle, pb.zero.stream membership", dgraph memory leaks will occur.

Closed this issue · 6 comments

If you suspect this could be a bug, follow the template.

  • What version of Dgraph are you using?
    v1.0.11

  • Have you tried reproducing the issue with latest release?

    v1.0.11 is the last release

  • What is the hardware spec (RAM, OS)?
    Distributor ID: Ubuntu
    Description: Ubuntu 16.04.5 LTS
    Release: 16.04
    Codename: xenial
    memory 256GB
    cores 80

  • Steps to reproduce the issue (command/config used to run Dgraph).
    Start 3 zero as a cluster
    Start 3 alpha as a cluster
    Running for two weeks

  • Expected behaviour and actual result.

==============
alpha
/dgraph # /usr/local/go/bin/go tool pprof http://localhost:1140/debug/pprof/heap
...
flat flat% sum% cum cum%
188MB 77.50% 77.50% 188MB 77.50% go.opencensus.io/trace.(*Span).AddMessageReceiveEvent
20.81MB 8.58% 86.08% 20.81MB 8.58% github.com/dgraph-io/dgraph/vendor/github.com/dgraph-io/badger/skl.NewSkiplist
6.54MB 2.70% 88.78% 6.54MB 2.70% github.com/dgraph-io/dgraph/vendor/github.com/dgraph-io/badger/table.(*Table).loadToRAM
6MB 2.47% 91.25% 6MB 2.47% github.com/dgraph-io/dgraph/posting.NewPosting
2MB 0.82% 92.08% 2MB 0.82% runtime.malg
2MB 0.82% 92.90% 2MB 0.82% go.opencensus.io/trace.copyAttributes
2MB 0.82% 93.73% 2MB 0.82% github.com/dgraph-io/dgraph/posting.(*List).updateMutationLayer
1.72MB 0.71% 94.43% 1.72MB 0.71% bytes.makeSlice
1.60MB 0.66% 95.09% 1.60MB 0.66% google.golang.org/grpc/internal/transport.newFramer
1.50MB 0.62% 95.71% 2MB 0.82% github.com/dgraph-io/dgraph/posting.getNew

call tree

complete

alpha.svg.gz

part

alpha

==================
zero
/home/chujunlin/dgraph # /usr/local/go/bin/go tool pprof http://localhost:1138/debug/pprof/heap
...
Showing top 10 nodes out of 25
flat flat% sum% cum cum%
564MB 85.63% 85.63% 564MB 85.63% go.opencensus.io/trace.(*Span).AddMessageSendEvent
83.20MB 12.63% 98.26% 83.20MB 12.63% github.com/dgraph-io/dgraph/vendor/github.com/dgraph-io/badger/skl.NewSkiplist
0 0% 98.26% 84.82MB 12.88% github.com/dgraph-io/dgraph/dgraph/cmd.Execute
0 0% 98.26% 282MB 42.82% github.com/dgraph-io/dgraph/dgraph/cmd/zero.(*Server).Oracle
0 0% 98.26% 282MB 42.82% github.com/dgraph-io/dgraph/dgraph/cmd/zero.(*Server).StreamMembership
0 0% 98.26% 84.82MB 12.88% github.com/dgraph-io/dgraph/dgraph/cmd/zero.init.0.func1
0 0% 98.26% 84.82MB 12.88% github.com/dgraph-io/dgraph/dgraph/cmd/zero.run
0 0% 98.26% 282MB 42.82% github.com/dgraph-io/dgraph/protos/pb.(*zeroOracleServer).Send
0 0% 98.26% 282MB 42.82% github.com/dgraph-io/dgraph/protos/pb.(*zeroStreamMembershipServer).Send
0 0% 98.26% 282MB 42.82% github.com/dgraph-io/dgraph/protos/pb._Zero_Oracle_Handler

call tree

complete

zero.svg.gz

part zero

Opencensus uses the plugin ocgrpc to track GRPC messages. For streaming messages, ocgrpc ends span only when the GRPC stream message ends; on the stream, any message transmitted will add a record to the span.

src/go.opencensus.io/plugin/ocgrpc/trace_common.go

func traceHandleRPC(ctx context.Context, rs stats.RPCStats) {
	span := trace.FromContext(ctx)
	// TODO: compressed and uncompressed sizes are not populated in every message.
	switch rs := rs.(type) {
	case *stats.Begin:
		span.AddAttributes(
			trace.BoolAttribute("Client", rs.Client),
			trace.BoolAttribute("FailFast", rs.FailFast))
	case *stats.InPayload:
		span.AddMessageReceiveEvent(0 /* TODO: messageID */, int64(rs.Length), int64(rs.WireLength))
	case *stats.OutPayload:
		span.AddMessageSendEvent(0, int64(rs.Length), int64(rs.WireLength))
	case *stats.End:
		if rs.Error != nil {
			s, ok := status.FromError(rs.Error)
			if ok {
				span.SetStatus(trace.Status{Code: int32(s.Code()), Message: s.Message()})
			} else {
				span.SetStatus(trace.Status{Code: int32(codes.Internal), Message: rs.Error.Error()})
			}
		}
		span.End()
	}
}

We can temporarily filter the tracking and statistics of these long-connected stream messages.

add a file

src/github.com/dgraph-io/dgraph/trace/trace.go

package trace

import (
	"context"
	"github.com/golang/glog"
	"go.opencensus.io/plugin/ocgrpc"
	"google.golang.org/grpc/stats"
	"strings"
)

var StreamingMessage = []string{"pb.Zero.StreamMembership", "pb.Zero.Oracle"}

type ClientHandler struct {
	Client           ocgrpc.ClientHandler
	StreamingMessage [] string
}

func (c *ClientHandler) HandleConn(ctx context.Context, cs stats.ConnStats) {
	c.Client.HandleConn(ctx,cs)
}

func (c *ClientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context {
	return c.Client.TagConn(ctx,cti)
}

func (c *ClientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {

	if message := ctx.Value(discardKey);message == nil {
		c.Client.HandleRPC(ctx,rs)
	}else{
		//glog.Infof("Discard flag for  %s\n",message.(string))
	}
}

func (c *ClientHandler) Discard(ctx context.Context, rti *stats.RPCTagInfo) bool {
	name := strings.TrimPrefix(rti.FullMethodName, "/")
	name = strings.Replace(name, "/", ".", -1)

	for _,message := range c.StreamingMessage {
		if name == message{
			return true
		}
	}
	return false
}

func (c *ClientHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo) context.Context {
	name := strings.TrimPrefix(rti.FullMethodName, "/")
	name = strings.Replace(name, "/", ".", -1)


	if c.Discard(ctx,rti) == false{
		ctx = c.Client.TagRPC(ctx,rti)
	}else {
		ctx = context.WithValue(ctx, discardKey,name)
		glog.Infof("Set Discard flag for  %s\n",name)
	}
	return ctx
}

type DiscardKey struct {}
var discardKey = DiscardKey{}

type  ServerHandler struct{
	Server           ocgrpc.ServerHandler
	StreamingMessage [] string

}
func (s *ServerHandler) HandleConn(ctx context.Context, cs stats.ConnStats) {
	s.Server.HandleConn(ctx,cs)
}

func (s *ServerHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context {
	return s.Server.TagConn(ctx,cti)
}

func (s *ServerHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {

	if message := ctx.Value(discardKey);message == nil {
		s.Server.HandleRPC(ctx,rs)
	}else{
		//glog.Infof("Discard flag for  %s\n",message.(string))
	}
}


func (s *ServerHandler) Discard(ctx context.Context, rti *stats.RPCTagInfo) bool {
	name := strings.TrimPrefix(rti.FullMethodName, "/")
	name = strings.Replace(name, "/", ".", -1)

	for _,message := range s.StreamingMessage {
		if name == message{
			return true
		}
	}
	return false
}

func (s *ServerHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo) context.Context {
	name := strings.TrimPrefix(rti.FullMethodName, "/")
	name = strings.Replace(name, "/", ".", -1)

	if s.Discard(ctx,rti) == false{
		ctx = s.Server.TagRPC(ctx,rti)
	}else {
		ctx = context.WithValue(ctx, discardKey,name)
		glog.Infof("Set Discard flag for  %s\n",name)
	}
	return ctx
}

src/github.com/dgraph-io/dgraph/dgraph/cmd/alpha/run.go

import (
....
	dtrace "github.com/dgraph-io/dgraph/trace"
....
)

func serveGRPC(l net.Listener, tlsCfg *tls.Config, wg *sync.WaitGroup) {
....

	opt := []grpc.ServerOption{
		grpc.MaxRecvMsgSize(x.GrpcMaxSize),
		grpc.MaxSendMsgSize(x.GrpcMaxSize),
		grpc.MaxConcurrentStreams(1000),
		grpc.StatsHandler(&dtrace.ServerHandler{
			Server:           ocgrpc.ServerHandler{},
			StreamingMessage: dtrace.StreamingMessage,
		}),
	}
....
}

src/github.com/dgraph-io/dgraph/dgraph/cmd/zero/run.go

import (
....
	dtrace "github.com/dgraph-io/dgraph/trace"
....
)
func (st *state) serveGRPC(l net.Listener, wg *sync.WaitGroup, store *raftwal.DiskStorage) {
....

	s := grpc.NewServer(
		grpc.MaxRecvMsgSize(x.GrpcMaxSize),
		grpc.MaxSendMsgSize(x.GrpcMaxSize),
		grpc.MaxConcurrentStreams(1000),
		grpc.StatsHandler(&dtrace.ServerHandler{
			Server:           ocgrpc.ServerHandler{},
			StreamingMessage: dtrace.StreamingMessage,
		}))
....

src/github.com/dgraph-io/dgraph/conn/pool.go

import (
....
	dtrace "github.com/dgraph-io/dgraph/trace"
....
)

func NewPool(addr string) (*Pool, error) {
	conn, err := grpc.Dial(addr,
		grpc.WithStatsHandler(&dtrace.ClientHandler{
			Client:           ocgrpc.ClientHandler{},
			StreamingMessage: dtrace.StreamingMessage,
		}),
....

Thanks for the thorough explanation of the problem. I can see the potential of leaked memory if the stream doesn't end cleanly. One question though, could we perhaps limit the sampling to achieve the same thing?
e.g.,

	opt := []grpc.ServerOption{
		grpc.MaxRecvMsgSize(x.GrpcMaxSize),
		grpc.MaxSendMsgSize(x.GrpcMaxSize),
		grpc.MaxConcurrentStreams(1000),
		grpc.StatsHandler(&ocgrpc.ServerHandler{
			StartOptions: otrace.StartOptions{
				Sampler: otrace.ProbabilitySampler(0.0),
			},
		}),
	}

We can turn off sampling for all spans with the parameter "--trace 0". According to your modification method and setting parameter trace to 0, at present, all spans are not reported to jaeger, but before the end of span, the event record is always added in memory. For ocgrpc, the strategy for sampling seems to be used only to control whether the span is reported after the span ends.

OpenCensus has solved this problem on January 26.
As long as we download and compile the dgraph version after January 26, there should be no such problem.

census-instrumentation/opencensus-go@4baeb63
census-instrumentation/opencensus-go#1015

Thanks I will update our deps.

This has been fixed since v1.0.12 (released 2019-03-05) and the current master. OpenCensus v0.21.0 was released on 2019-04-25 is vendored in master (vendor.json).

I'll close this issue since we're past the version of OpenCensus from January 26 that had the leaks. Feel free to re-open if you're still seeing these memory leaks in the latest version of Dgraph.