From a46bbe892d56e42a3d58251785cbc3055389d2a9 Mon Sep 17 00:00:00 2001 From: FFMMM Date: Tue, 12 Apr 2022 10:50:25 -0700 Subject: [PATCH] add more labels to RequestRecorder (#12727) Co-authored-by: Daniel Nephin Signed-off-by: FFMMM --- .changelog/12727.txt | 4 + agent/consul/options.go | 2 +- agent/consul/server.go | 41 ++-- agent/consul/server_test.go | 15 +- agent/metrics_test.go | 2 +- agent/rpc/middleware/interceptors.go | 90 ++++++- agent/rpc/middleware/interceptors_test.go | 271 ++++++++++++++++------ 7 files changed, 320 insertions(+), 105 deletions(-) create mode 100644 .changelog/12727.txt diff --git a/.changelog/12727.txt b/.changelog/12727.txt new file mode 100644 index 0000000000..9ec5da4577 --- /dev/null +++ b/.changelog/12727.txt @@ -0,0 +1,4 @@ +```release-note:improvement +telemetry: Add new `leader` label to `consul.rpc.server.call` and optional `target_datacenter`, `locality`, +`allow_stale`, and `blocking` optional labels. +``` \ No newline at end of file diff --git a/agent/consul/options.go b/agent/consul/options.go index e253864a56..a4cd299e32 100644 --- a/agent/consul/options.go +++ b/agent/consul/options.go @@ -25,7 +25,7 @@ type Deps struct { // the rpc server. GetNetRPCInterceptorFunc func(recorder *middleware.RequestRecorder) rpc.ServerServiceCallInterceptor // NewRequestRecorderFunc provides a middleware.RequestRecorder for the server to use; it cannot be nil - NewRequestRecorderFunc func(logger hclog.Logger) *middleware.RequestRecorder + NewRequestRecorderFunc func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder EnterpriseDeps } diff --git a/agent/consul/server.go b/agent/consul/server.go index 401954d853..eaf974032b 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -386,24 +386,6 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve serverLogger := flat.Logger.NamedIntercept(logging.ConsulServer) loggers := newLoggerStore(serverLogger) - var recorder *middleware.RequestRecorder - if flat.NewRequestRecorderFunc == nil { - return nil, fmt.Errorf("cannot initialize server without an RPC request recorder provider") - } - recorder = flat.NewRequestRecorderFunc(serverLogger) - if recorder == nil { - return nil, fmt.Errorf("cannot initialize server without a non nil RPC request recorder") - } - - var rpcServer, insecureRPCServer *rpc.Server - if flat.GetNetRPCInterceptorFunc == nil { - rpcServer = rpc.NewServer() - insecureRPCServer = rpc.NewServer() - } else { - rpcServer = rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder))) - insecureRPCServer = rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder))) - } - eventPublisher := stream.NewEventPublisher(10 * time.Second) fsmDeps := fsm.Deps{ @@ -427,9 +409,6 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve leaveCh: make(chan struct{}), reconcileCh: make(chan serf.Member, reconcileChSize), router: flat.Router, - rpcRecorder: recorder, - rpcServer: rpcServer, - insecureRPCServer: insecureRPCServer, tlsConfigurator: flat.TLSConfigurator, publicGRPCServer: publicGRPCServer, reassertLeaderCh: make(chan chan error), @@ -443,6 +422,26 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve publisher: eventPublisher, } + var recorder *middleware.RequestRecorder + if flat.NewRequestRecorderFunc != nil { + recorder = flat.NewRequestRecorderFunc(serverLogger, s.IsLeader, s.config.Datacenter) + } else { + return nil, fmt.Errorf("cannot initialize server without an RPC request recorder provider") + } + if recorder == nil { + return nil, fmt.Errorf("cannot initialize server with a nil RPC request recorder") + } + + if flat.GetNetRPCInterceptorFunc == nil { + s.rpcServer = rpc.NewServer() + s.insecureRPCServer = rpc.NewServer() + } else { + s.rpcServer = rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder))) + s.insecureRPCServer = rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder))) + } + + s.rpcRecorder = recorder + go s.publisher.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) if s.config.ConnectMeshGatewayWANFederationEnabled { diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 5c06fb4d96..0d6a4925b6 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -1172,7 +1172,8 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) { // note that there will be "internal" net/rpc calls made // that will still show up; those don't go thru the net/rpc interceptor; // see consul.agent.rpc.middleware.RPCTypeInternal for context - deps.NewRequestRecorderFunc = func(logger hclog.Logger) *middleware.RequestRecorder { + deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder { + // for the purposes of this test, we don't need isLeader or localDC return &middleware.RequestRecorder{ Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}), RecorderFunc: simpleRecorderFunc, @@ -1205,7 +1206,8 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) { // note that there will be "internal" net/rpc calls made // that will still show up; those don't go thru the net/rpc interceptor; // see consul.agent.rpc.middleware.RPCTypeInternal for context - deps.NewRequestRecorderFunc = func(logger hclog.Logger) *middleware.RequestRecorder { + deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder { + // for the purposes of this test, we don't need isLeader or localDC return &middleware.RequestRecorder{ Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}), RecorderFunc: simpleRecorderFunc, @@ -1265,14 +1267,14 @@ func TestServer_RPC_RequestRecorder(t *testing.T) { t.Run("test nil RequestRecorder", func(t *testing.T) { _, conf := testServerConfig(t) deps := newDefaultDeps(t, conf) - deps.NewRequestRecorderFunc = func(logger hclog.Logger) *middleware.RequestRecorder { + deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder { return nil } s2, err := NewServer(conf, deps, grpc.NewServer()) require.Error(t, err, "need err when RequestRecorder is nil") - require.Equal(t, err.Error(), "cannot initialize server without a non nil RPC request recorder") + require.Equal(t, err.Error(), "cannot initialize server with a nil RPC request recorder") t.Cleanup(func() { if s2 != nil { @@ -1308,7 +1310,8 @@ func TestServer_RPC_MetricsIntercept(t *testing.T) { simpleRecorderFunc := func(key []string, val float32, labels []metrics.Label) { storage[keyMakingFunc(key, labels)] = val } - deps.NewRequestRecorderFunc = func(logger hclog.Logger) *middleware.RequestRecorder { + deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder { + // for the purposes of this test, we don't need isLeader or localDC return &middleware.RequestRecorder{ Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}), RecorderFunc: simpleRecorderFunc, @@ -1344,11 +1347,13 @@ func TestServer_RPC_MetricsIntercept(t *testing.T) { {Name: "errored", Value: "false"}, {Name: "request_type", Value: "read"}, {Name: "rpc_type", Value: "test"}, + {Name: "server_role", Value: "unreported"}, } key := keyMakingFunc(middleware.OneTwelveRPCSummary[0].Name, expectedLabels) if _, ok := storage[key]; !ok { + // the compound key will look like: "rpc+server+call+Status.Ping+false+read+test+unreported" t.Fatalf("Did not find key %s in the metrics log, ", key) } }) diff --git a/agent/metrics_test.go b/agent/metrics_test.go index 448694e3e9..76b3fff287 100644 --- a/agent/metrics_test.go +++ b/agent/metrics_test.go @@ -199,7 +199,7 @@ func TestAgent_OneTwelveRPCMetrics(t *testing.T) { recordPromMetrics(t, a, respRec) // make sure the labels exist for this metric - assertMetricExistsWithLabels(t, respRec, metricsPrefix+"_rpc_server_call", []string{"errored", "method", "request_type", "rpc_type"}) + assertMetricExistsWithLabels(t, respRec, metricsPrefix+"_rpc_server_call", []string{"errored", "method", "request_type", "rpc_type", "leader"}) // make sure we see 3 Status.Ping metrics corresponding to the calls we made above assertLabelWithValueForMetricExistsNTime(t, respRec, metricsPrefix+"_rpc_server_call", "method", "Status.Ping", 3) }) diff --git a/agent/rpc/middleware/interceptors.go b/agent/rpc/middleware/interceptors.go index ba6747c3a7..049283ac20 100644 --- a/agent/rpc/middleware/interceptors.go +++ b/agent/rpc/middleware/interceptors.go @@ -33,33 +33,79 @@ var OneTwelveRPCSummary = []prometheus.SummaryDefinition{ } type RequestRecorder struct { - Logger hclog.Logger - RecorderFunc func(key []string, val float32, labels []metrics.Label) + Logger hclog.Logger + RecorderFunc func(key []string, val float32, labels []metrics.Label) + serverIsLeader func() bool + localDC string } -func NewRequestRecorder(logger hclog.Logger) *RequestRecorder { - return &RequestRecorder{Logger: logger, RecorderFunc: metrics.AddSampleWithLabels} +func NewRequestRecorder(logger hclog.Logger, isLeader func() bool, localDC string) *RequestRecorder { + return &RequestRecorder{ + Logger: logger, + RecorderFunc: metrics.AddSampleWithLabels, + serverIsLeader: isLeader, + localDC: localDC, + } } func (r *RequestRecorder) Record(requestName string, rpcType string, start time.Time, request interface{}, respErrored bool) { elapsed := time.Since(start).Milliseconds() reqType := requestType(request) + isLeader := r.getServerLeadership() labels := []metrics.Label{ {Name: "method", Value: requestName}, {Name: "errored", Value: strconv.FormatBool(respErrored)}, {Name: "request_type", Value: reqType}, {Name: "rpc_type", Value: rpcType}, + {Name: "leader", Value: isLeader}, } + labels = r.addOptionalLabels(request, labels) + // math.MaxInt64 < math.MaxFloat32 is true so we should be good! r.RecorderFunc(metricRPCRequest, float32(elapsed), labels) - r.Logger.Trace(requestLogName, - "method", requestName, - "errored", respErrored, - "request_type", reqType, - "rpc_type", rpcType, - "elapsed", elapsed) + + labelsArr := flattenLabels(labels) + r.Logger.Trace(requestLogName, labelsArr...) + +} + +func flattenLabels(labels []metrics.Label) []interface{} { + + var labelArr []interface{} + for _, label := range labels { + labelArr = append(labelArr, label.Name, label.Value) + } + + return labelArr +} + +func (r *RequestRecorder) addOptionalLabels(request interface{}, labels []metrics.Label) []metrics.Label { + if rq, ok := request.(readQuery); ok { + labels = append(labels, + metrics.Label{ + Name: "allow_stale", + Value: strconv.FormatBool(rq.AllowStaleRead()), + }, + metrics.Label{ + Name: "blocking", + Value: strconv.FormatBool(rq.GetMinQueryIndex() > 0), + }) + } + + if td, ok := request.(targetDC); ok { + requestDC := td.RequestDatacenter() + labels = append(labels, metrics.Label{Name: "target_datacenter", Value: requestDC}) + + if r.localDC == requestDC { + labels = append(labels, metrics.Label{Name: "locality", Value: "local"}) + } else { + labels = append(labels, metrics.Label{Name: "locality", Value: "forwarded"}) + } + } + + return labels } func requestType(req interface{}) string { @@ -77,6 +123,30 @@ func requestType(req interface{}) string { return "unreported" } +func (r *RequestRecorder) getServerLeadership() string { + if r.serverIsLeader != nil { + if r.serverIsLeader() { + return "true" + } else { + return "false" + } + } + + // This logical branch should not happen. If it happens + // it means that we have not plumbed down a way to verify + // whether the server handling the request was a leader or not + return "unreported" +} + +type readQuery interface { + GetMinQueryIndex() uint64 + AllowStaleRead() bool +} + +type targetDC interface { + RequestDatacenter() string +} + func GetNetRPCInterceptor(recorder *RequestRecorder) rpc.ServerServiceCallInterceptor { return func(reqServiceMethod string, argv, replyv reflect.Value, handler func() error) { reqStart := time.Now() diff --git a/agent/rpc/middleware/interceptors_test.go b/agent/rpc/middleware/interceptors_test.go index 63fbefecb3..d9676846bc 100644 --- a/agent/rpc/middleware/interceptors_test.go +++ b/agent/rpc/middleware/interceptors_test.go @@ -48,6 +48,8 @@ var simpleRecorderFunc = func(key []string, val float32, labels []metrics.Label) type readRequest struct{} type writeRequest struct{} +type readReqWithTD struct{} +type writeReqWithTD struct{} func (rr readRequest) IsRead() bool { return true @@ -57,75 +59,210 @@ func (wr writeRequest) IsRead() bool { return false } -// TestRequestRecorder_SimpleOK tests that the RequestRecorder can record a simple request. -func TestRequestRecorder_SimpleOK(t *testing.T) { - t.Parallel() - - r := RequestRecorder{ - Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}), - RecorderFunc: simpleRecorderFunc, - } - - start := time.Now() - r.Record("A.B", RPCTypeInternal, start, struct{}{}, false) - - expectedLabels := []metrics.Label{ - {Name: "method", Value: "A.B"}, - {Name: "errored", Value: "false"}, - {Name: "request_type", Value: "unreported"}, - {Name: "rpc_type", Value: RPCTypeInternal}, - } - - o := store.get(append(metricRPCRequest, expectedLabels[0].Value)) - require.Equal(t, o.key, metricRPCRequest) - require.LessOrEqual(t, o.elapsed, float32(start.Sub(time.Now()).Milliseconds())) - require.Equal(t, o.labels, expectedLabels) +func (r readReqWithTD) IsRead() bool { + return true } -// TestRequestRecorder_ReadRequest tests that RequestRecorder can record a read request AND a responseErrored arg. -func TestRequestRecorder_ReadRequest(t *testing.T) { - t.Parallel() - - r := RequestRecorder{ - Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}), - RecorderFunc: simpleRecorderFunc, - } - - start := time.Now() - - r.Record("B.A", RPCTypeNetRPC, start, readRequest{}, true) - - expectedLabels := []metrics.Label{ - {Name: "method", Value: "B.A"}, - {Name: "errored", Value: "true"}, - {Name: "request_type", Value: "read"}, - {Name: "rpc_type", Value: RPCTypeNetRPC}, - } - - o := store.get(append(metricRPCRequest, expectedLabels[0].Value)) - require.Equal(t, o.labels, expectedLabels) +func (r readReqWithTD) RequestDatacenter() string { + return "dc3" } -// TestRequestRecorder_WriteRequest tests that RequestRecorder can record a write request. -func TestRequestRecorder_WriteRequest(t *testing.T) { - t.Parallel() - - r := RequestRecorder{ - Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}), - RecorderFunc: simpleRecorderFunc, - } - - start := time.Now() - - r.Record("B.C", RPCTypeNetRPC, start, writeRequest{}, true) - - expectedLabels := []metrics.Label{ - {Name: "method", Value: "B.C"}, - {Name: "errored", Value: "true"}, - {Name: "request_type", Value: "write"}, - {Name: "rpc_type", Value: RPCTypeNetRPC}, - } - - o := store.get(append(metricRPCRequest, expectedLabels[0].Value)) - require.Equal(t, o.labels, expectedLabels) +func (r readReqWithTD) GetMinQueryIndex() uint64 { + return 1 +} +func (r readReqWithTD) AllowStaleRead() bool { + return false +} + +func (w writeReqWithTD) IsRead() bool { + return false +} + +func (w writeReqWithTD) RequestDatacenter() string { + return "dc2" +} + +type testCase struct { + name string + // description is meant for human friendliness + description string + // requestName is encouraged to be unique across tests to + // avoid lock contention + requestName string + requestI interface{} + rpcType string + errored bool + isLeader func() bool + dc string + // the first element in expectedLabels should be the method name + expectedLabels []metrics.Label +} + +var testCases = []testCase{ + { + name: "simple ok", + description: "This is a simple happy path test case. We check for pass through and normal request processing", + requestName: "A.B", + requestI: struct{}{}, + rpcType: RPCTypeInternal, + errored: false, + dc: "dc1", + expectedLabels: []metrics.Label{ + {Name: "method", Value: "A.B"}, + {Name: "errored", Value: "false"}, + {Name: "request_type", Value: "unreported"}, + {Name: "rpc_type", Value: RPCTypeInternal}, + {Name: "leader", Value: "unreported"}, + }, + }, + { + name: "simple ok errored", + description: "Checks that the errored value is populated right.", + requestName: "A.C", + requestI: struct{}{}, + rpcType: "test", + errored: true, + dc: "dc1", + expectedLabels: []metrics.Label{ + {Name: "method", Value: "A.C"}, + {Name: "errored", Value: "true"}, + {Name: "request_type", Value: "unreported"}, + {Name: "rpc_type", Value: "test"}, + {Name: "leader", Value: "unreported"}, + }, + }, + { + name: "read request, rpc type internal", + description: "Checks for read request interface parsing", + requestName: "B.C", + requestI: readRequest{}, + rpcType: RPCTypeInternal, + errored: false, + dc: "dc1", + expectedLabels: []metrics.Label{ + {Name: "method", Value: "B.C"}, + {Name: "errored", Value: "false"}, + {Name: "request_type", Value: "read"}, + {Name: "rpc_type", Value: RPCTypeInternal}, + {Name: "leader", Value: "unreported"}, + }, + }, + { + name: "write request, rpc type net/rpc", + description: "Checks for write request interface, different RPC type", + requestName: "D.E", + requestI: writeRequest{}, + rpcType: RPCTypeNetRPC, + errored: false, + dc: "dc1", + expectedLabels: []metrics.Label{ + {Name: "method", Value: "D.E"}, + {Name: "errored", Value: "false"}, + {Name: "request_type", Value: "write"}, + {Name: "rpc_type", Value: RPCTypeNetRPC}, + {Name: "leader", Value: "unreported"}, + }, + }, + { + name: "read request with blocking stale and target dc", + description: "Checks for locality, blocking status and target dc", + requestName: "E.F", + requestI: readReqWithTD{}, + rpcType: RPCTypeNetRPC, + errored: false, + dc: "dc1", + expectedLabels: []metrics.Label{ + {Name: "method", Value: "E.F"}, + {Name: "errored", Value: "false"}, + {Name: "request_type", Value: "read"}, + {Name: "rpc_type", Value: RPCTypeNetRPC}, + {Name: "leader", Value: "unreported"}, + {Name: "allow_stale", Value: "false"}, + {Name: "blocking", Value: "true"}, + {Name: "target_datacenter", Value: "dc3"}, + {Name: "locality", Value: "forwarded"}, + }, + }, + { + name: "write request with TD, locality local", + description: "Checks for write request with local forwarding and target dc", + requestName: "F.G", + requestI: writeReqWithTD{}, + rpcType: RPCTypeNetRPC, + errored: false, + dc: "dc2", + expectedLabels: []metrics.Label{ + {Name: "method", Value: "F.G"}, + {Name: "errored", Value: "false"}, + {Name: "request_type", Value: "write"}, + {Name: "rpc_type", Value: RPCTypeNetRPC}, + {Name: "leader", Value: "unreported"}, + {Name: "target_datacenter", Value: "dc2"}, + {Name: "locality", Value: "local"}, + }, + }, + { + name: "is leader", + description: "checks for is leader", + requestName: "G.H", + requestI: struct{}{}, + rpcType: "test", + errored: false, + isLeader: func() bool { + return true + }, + expectedLabels: []metrics.Label{ + {Name: "method", Value: "G.H"}, + {Name: "errored", Value: "false"}, + {Name: "request_type", Value: "unreported"}, + {Name: "rpc_type", Value: "test"}, + {Name: "leader", Value: "true"}, + }, + }, + { + name: "is not leader", + description: "checks for is not leader", + requestName: "H.I", + requestI: struct{}{}, + rpcType: "test", + errored: false, + isLeader: func() bool { + return false + }, + expectedLabels: []metrics.Label{ + {Name: "method", Value: "H.I"}, + {Name: "errored", Value: "false"}, + {Name: "request_type", Value: "unreported"}, + {Name: "rpc_type", Value: "test"}, + {Name: "leader", Value: "false"}, + }, + }, +} + +// TestRequestRecorder goes over all the parsing and reporting that RequestRecorder +// is expected to perform. +func TestRequestRecorder(t *testing.T) { + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + r := RequestRecorder{ + Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}), + RecorderFunc: simpleRecorderFunc, + serverIsLeader: tc.isLeader, + localDC: tc.dc, + } + + start := time.Now() + r.Record(tc.requestName, tc.rpcType, start, tc.requestI, tc.errored) + + key := append(metricRPCRequest, tc.expectedLabels[0].Value) + o := store.get(key) + + require.Equal(t, o.key, metricRPCRequest) + require.LessOrEqual(t, o.elapsed, float32(start.Sub(time.Now()).Milliseconds())) + require.Equal(t, o.labels, tc.expectedLabels) + + }) + } }