mirror of https://github.com/status-im/consul.git
add more labels to RequestRecorder (#12727)
Co-authored-by: Daniel Nephin <dnephin@hashicorp.com> Signed-off-by: FFMMM <FFMMM@users.noreply.github.com>
This commit is contained in:
parent
1006c8a94b
commit
a46bbe892d
|
@ -0,0 +1,4 @@
|
|||
```release-note:improvement
|
||||
telemetry: Add new `leader` label to `consul.rpc.server.call` and optional `target_datacenter`, `locality`,
|
||||
`allow_stale`, and `blocking` optional labels.
|
||||
```
|
|
@ -25,7 +25,7 @@ type Deps struct {
|
|||
// the rpc server.
|
||||
GetNetRPCInterceptorFunc func(recorder *middleware.RequestRecorder) rpc.ServerServiceCallInterceptor
|
||||
// NewRequestRecorderFunc provides a middleware.RequestRecorder for the server to use; it cannot be nil
|
||||
NewRequestRecorderFunc func(logger hclog.Logger) *middleware.RequestRecorder
|
||||
NewRequestRecorderFunc func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder
|
||||
EnterpriseDeps
|
||||
}
|
||||
|
||||
|
|
|
@ -386,24 +386,6 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve
|
|||
serverLogger := flat.Logger.NamedIntercept(logging.ConsulServer)
|
||||
loggers := newLoggerStore(serverLogger)
|
||||
|
||||
var recorder *middleware.RequestRecorder
|
||||
if flat.NewRequestRecorderFunc == nil {
|
||||
return nil, fmt.Errorf("cannot initialize server without an RPC request recorder provider")
|
||||
}
|
||||
recorder = flat.NewRequestRecorderFunc(serverLogger)
|
||||
if recorder == nil {
|
||||
return nil, fmt.Errorf("cannot initialize server without a non nil RPC request recorder")
|
||||
}
|
||||
|
||||
var rpcServer, insecureRPCServer *rpc.Server
|
||||
if flat.GetNetRPCInterceptorFunc == nil {
|
||||
rpcServer = rpc.NewServer()
|
||||
insecureRPCServer = rpc.NewServer()
|
||||
} else {
|
||||
rpcServer = rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder)))
|
||||
insecureRPCServer = rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder)))
|
||||
}
|
||||
|
||||
eventPublisher := stream.NewEventPublisher(10 * time.Second)
|
||||
|
||||
fsmDeps := fsm.Deps{
|
||||
|
@ -427,9 +409,6 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve
|
|||
leaveCh: make(chan struct{}),
|
||||
reconcileCh: make(chan serf.Member, reconcileChSize),
|
||||
router: flat.Router,
|
||||
rpcRecorder: recorder,
|
||||
rpcServer: rpcServer,
|
||||
insecureRPCServer: insecureRPCServer,
|
||||
tlsConfigurator: flat.TLSConfigurator,
|
||||
publicGRPCServer: publicGRPCServer,
|
||||
reassertLeaderCh: make(chan chan error),
|
||||
|
@ -443,6 +422,26 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve
|
|||
publisher: eventPublisher,
|
||||
}
|
||||
|
||||
var recorder *middleware.RequestRecorder
|
||||
if flat.NewRequestRecorderFunc != nil {
|
||||
recorder = flat.NewRequestRecorderFunc(serverLogger, s.IsLeader, s.config.Datacenter)
|
||||
} else {
|
||||
return nil, fmt.Errorf("cannot initialize server without an RPC request recorder provider")
|
||||
}
|
||||
if recorder == nil {
|
||||
return nil, fmt.Errorf("cannot initialize server with a nil RPC request recorder")
|
||||
}
|
||||
|
||||
if flat.GetNetRPCInterceptorFunc == nil {
|
||||
s.rpcServer = rpc.NewServer()
|
||||
s.insecureRPCServer = rpc.NewServer()
|
||||
} else {
|
||||
s.rpcServer = rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder)))
|
||||
s.insecureRPCServer = rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(flat.GetNetRPCInterceptorFunc(recorder)))
|
||||
}
|
||||
|
||||
s.rpcRecorder = recorder
|
||||
|
||||
go s.publisher.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
|
||||
|
||||
if s.config.ConnectMeshGatewayWANFederationEnabled {
|
||||
|
|
|
@ -1172,7 +1172,8 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
|
|||
// note that there will be "internal" net/rpc calls made
|
||||
// that will still show up; those don't go thru the net/rpc interceptor;
|
||||
// see consul.agent.rpc.middleware.RPCTypeInternal for context
|
||||
deps.NewRequestRecorderFunc = func(logger hclog.Logger) *middleware.RequestRecorder {
|
||||
deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder {
|
||||
// for the purposes of this test, we don't need isLeader or localDC
|
||||
return &middleware.RequestRecorder{
|
||||
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
|
||||
RecorderFunc: simpleRecorderFunc,
|
||||
|
@ -1205,7 +1206,8 @@ func TestServer_RPC_MetricsIntercept_Off(t *testing.T) {
|
|||
// note that there will be "internal" net/rpc calls made
|
||||
// that will still show up; those don't go thru the net/rpc interceptor;
|
||||
// see consul.agent.rpc.middleware.RPCTypeInternal for context
|
||||
deps.NewRequestRecorderFunc = func(logger hclog.Logger) *middleware.RequestRecorder {
|
||||
deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder {
|
||||
// for the purposes of this test, we don't need isLeader or localDC
|
||||
return &middleware.RequestRecorder{
|
||||
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
|
||||
RecorderFunc: simpleRecorderFunc,
|
||||
|
@ -1265,14 +1267,14 @@ func TestServer_RPC_RequestRecorder(t *testing.T) {
|
|||
t.Run("test nil RequestRecorder", func(t *testing.T) {
|
||||
_, conf := testServerConfig(t)
|
||||
deps := newDefaultDeps(t, conf)
|
||||
deps.NewRequestRecorderFunc = func(logger hclog.Logger) *middleware.RequestRecorder {
|
||||
deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder {
|
||||
return nil
|
||||
}
|
||||
|
||||
s2, err := NewServer(conf, deps, grpc.NewServer())
|
||||
|
||||
require.Error(t, err, "need err when RequestRecorder is nil")
|
||||
require.Equal(t, err.Error(), "cannot initialize server without a non nil RPC request recorder")
|
||||
require.Equal(t, err.Error(), "cannot initialize server with a nil RPC request recorder")
|
||||
|
||||
t.Cleanup(func() {
|
||||
if s2 != nil {
|
||||
|
@ -1308,7 +1310,8 @@ func TestServer_RPC_MetricsIntercept(t *testing.T) {
|
|||
simpleRecorderFunc := func(key []string, val float32, labels []metrics.Label) {
|
||||
storage[keyMakingFunc(key, labels)] = val
|
||||
}
|
||||
deps.NewRequestRecorderFunc = func(logger hclog.Logger) *middleware.RequestRecorder {
|
||||
deps.NewRequestRecorderFunc = func(logger hclog.Logger, isLeader func() bool, localDC string) *middleware.RequestRecorder {
|
||||
// for the purposes of this test, we don't need isLeader or localDC
|
||||
return &middleware.RequestRecorder{
|
||||
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
|
||||
RecorderFunc: simpleRecorderFunc,
|
||||
|
@ -1344,11 +1347,13 @@ func TestServer_RPC_MetricsIntercept(t *testing.T) {
|
|||
{Name: "errored", Value: "false"},
|
||||
{Name: "request_type", Value: "read"},
|
||||
{Name: "rpc_type", Value: "test"},
|
||||
{Name: "server_role", Value: "unreported"},
|
||||
}
|
||||
|
||||
key := keyMakingFunc(middleware.OneTwelveRPCSummary[0].Name, expectedLabels)
|
||||
|
||||
if _, ok := storage[key]; !ok {
|
||||
// the compound key will look like: "rpc+server+call+Status.Ping+false+read+test+unreported"
|
||||
t.Fatalf("Did not find key %s in the metrics log, ", key)
|
||||
}
|
||||
})
|
||||
|
|
|
@ -199,7 +199,7 @@ func TestAgent_OneTwelveRPCMetrics(t *testing.T) {
|
|||
recordPromMetrics(t, a, respRec)
|
||||
|
||||
// make sure the labels exist for this metric
|
||||
assertMetricExistsWithLabels(t, respRec, metricsPrefix+"_rpc_server_call", []string{"errored", "method", "request_type", "rpc_type"})
|
||||
assertMetricExistsWithLabels(t, respRec, metricsPrefix+"_rpc_server_call", []string{"errored", "method", "request_type", "rpc_type", "leader"})
|
||||
// make sure we see 3 Status.Ping metrics corresponding to the calls we made above
|
||||
assertLabelWithValueForMetricExistsNTime(t, respRec, metricsPrefix+"_rpc_server_call", "method", "Status.Ping", 3)
|
||||
})
|
||||
|
|
|
@ -33,33 +33,79 @@ var OneTwelveRPCSummary = []prometheus.SummaryDefinition{
|
|||
}
|
||||
|
||||
type RequestRecorder struct {
|
||||
Logger hclog.Logger
|
||||
RecorderFunc func(key []string, val float32, labels []metrics.Label)
|
||||
Logger hclog.Logger
|
||||
RecorderFunc func(key []string, val float32, labels []metrics.Label)
|
||||
serverIsLeader func() bool
|
||||
localDC string
|
||||
}
|
||||
|
||||
func NewRequestRecorder(logger hclog.Logger) *RequestRecorder {
|
||||
return &RequestRecorder{Logger: logger, RecorderFunc: metrics.AddSampleWithLabels}
|
||||
func NewRequestRecorder(logger hclog.Logger, isLeader func() bool, localDC string) *RequestRecorder {
|
||||
return &RequestRecorder{
|
||||
Logger: logger,
|
||||
RecorderFunc: metrics.AddSampleWithLabels,
|
||||
serverIsLeader: isLeader,
|
||||
localDC: localDC,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RequestRecorder) Record(requestName string, rpcType string, start time.Time, request interface{}, respErrored bool) {
|
||||
elapsed := time.Since(start).Milliseconds()
|
||||
reqType := requestType(request)
|
||||
isLeader := r.getServerLeadership()
|
||||
|
||||
labels := []metrics.Label{
|
||||
{Name: "method", Value: requestName},
|
||||
{Name: "errored", Value: strconv.FormatBool(respErrored)},
|
||||
{Name: "request_type", Value: reqType},
|
||||
{Name: "rpc_type", Value: rpcType},
|
||||
{Name: "leader", Value: isLeader},
|
||||
}
|
||||
|
||||
labels = r.addOptionalLabels(request, labels)
|
||||
|
||||
// math.MaxInt64 < math.MaxFloat32 is true so we should be good!
|
||||
r.RecorderFunc(metricRPCRequest, float32(elapsed), labels)
|
||||
r.Logger.Trace(requestLogName,
|
||||
"method", requestName,
|
||||
"errored", respErrored,
|
||||
"request_type", reqType,
|
||||
"rpc_type", rpcType,
|
||||
"elapsed", elapsed)
|
||||
|
||||
labelsArr := flattenLabels(labels)
|
||||
r.Logger.Trace(requestLogName, labelsArr...)
|
||||
|
||||
}
|
||||
|
||||
func flattenLabels(labels []metrics.Label) []interface{} {
|
||||
|
||||
var labelArr []interface{}
|
||||
for _, label := range labels {
|
||||
labelArr = append(labelArr, label.Name, label.Value)
|
||||
}
|
||||
|
||||
return labelArr
|
||||
}
|
||||
|
||||
func (r *RequestRecorder) addOptionalLabels(request interface{}, labels []metrics.Label) []metrics.Label {
|
||||
if rq, ok := request.(readQuery); ok {
|
||||
labels = append(labels,
|
||||
metrics.Label{
|
||||
Name: "allow_stale",
|
||||
Value: strconv.FormatBool(rq.AllowStaleRead()),
|
||||
},
|
||||
metrics.Label{
|
||||
Name: "blocking",
|
||||
Value: strconv.FormatBool(rq.GetMinQueryIndex() > 0),
|
||||
})
|
||||
}
|
||||
|
||||
if td, ok := request.(targetDC); ok {
|
||||
requestDC := td.RequestDatacenter()
|
||||
labels = append(labels, metrics.Label{Name: "target_datacenter", Value: requestDC})
|
||||
|
||||
if r.localDC == requestDC {
|
||||
labels = append(labels, metrics.Label{Name: "locality", Value: "local"})
|
||||
} else {
|
||||
labels = append(labels, metrics.Label{Name: "locality", Value: "forwarded"})
|
||||
}
|
||||
}
|
||||
|
||||
return labels
|
||||
}
|
||||
|
||||
func requestType(req interface{}) string {
|
||||
|
@ -77,6 +123,30 @@ func requestType(req interface{}) string {
|
|||
return "unreported"
|
||||
}
|
||||
|
||||
func (r *RequestRecorder) getServerLeadership() string {
|
||||
if r.serverIsLeader != nil {
|
||||
if r.serverIsLeader() {
|
||||
return "true"
|
||||
} else {
|
||||
return "false"
|
||||
}
|
||||
}
|
||||
|
||||
// This logical branch should not happen. If it happens
|
||||
// it means that we have not plumbed down a way to verify
|
||||
// whether the server handling the request was a leader or not
|
||||
return "unreported"
|
||||
}
|
||||
|
||||
type readQuery interface {
|
||||
GetMinQueryIndex() uint64
|
||||
AllowStaleRead() bool
|
||||
}
|
||||
|
||||
type targetDC interface {
|
||||
RequestDatacenter() string
|
||||
}
|
||||
|
||||
func GetNetRPCInterceptor(recorder *RequestRecorder) rpc.ServerServiceCallInterceptor {
|
||||
return func(reqServiceMethod string, argv, replyv reflect.Value, handler func() error) {
|
||||
reqStart := time.Now()
|
||||
|
|
|
@ -48,6 +48,8 @@ var simpleRecorderFunc = func(key []string, val float32, labels []metrics.Label)
|
|||
|
||||
type readRequest struct{}
|
||||
type writeRequest struct{}
|
||||
type readReqWithTD struct{}
|
||||
type writeReqWithTD struct{}
|
||||
|
||||
func (rr readRequest) IsRead() bool {
|
||||
return true
|
||||
|
@ -57,75 +59,210 @@ func (wr writeRequest) IsRead() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// TestRequestRecorder_SimpleOK tests that the RequestRecorder can record a simple request.
|
||||
func TestRequestRecorder_SimpleOK(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
r := RequestRecorder{
|
||||
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
|
||||
RecorderFunc: simpleRecorderFunc,
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
r.Record("A.B", RPCTypeInternal, start, struct{}{}, false)
|
||||
|
||||
expectedLabels := []metrics.Label{
|
||||
{Name: "method", Value: "A.B"},
|
||||
{Name: "errored", Value: "false"},
|
||||
{Name: "request_type", Value: "unreported"},
|
||||
{Name: "rpc_type", Value: RPCTypeInternal},
|
||||
}
|
||||
|
||||
o := store.get(append(metricRPCRequest, expectedLabels[0].Value))
|
||||
require.Equal(t, o.key, metricRPCRequest)
|
||||
require.LessOrEqual(t, o.elapsed, float32(start.Sub(time.Now()).Milliseconds()))
|
||||
require.Equal(t, o.labels, expectedLabels)
|
||||
func (r readReqWithTD) IsRead() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// TestRequestRecorder_ReadRequest tests that RequestRecorder can record a read request AND a responseErrored arg.
|
||||
func TestRequestRecorder_ReadRequest(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
r := RequestRecorder{
|
||||
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
|
||||
RecorderFunc: simpleRecorderFunc,
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
r.Record("B.A", RPCTypeNetRPC, start, readRequest{}, true)
|
||||
|
||||
expectedLabels := []metrics.Label{
|
||||
{Name: "method", Value: "B.A"},
|
||||
{Name: "errored", Value: "true"},
|
||||
{Name: "request_type", Value: "read"},
|
||||
{Name: "rpc_type", Value: RPCTypeNetRPC},
|
||||
}
|
||||
|
||||
o := store.get(append(metricRPCRequest, expectedLabels[0].Value))
|
||||
require.Equal(t, o.labels, expectedLabels)
|
||||
func (r readReqWithTD) RequestDatacenter() string {
|
||||
return "dc3"
|
||||
}
|
||||
|
||||
// TestRequestRecorder_WriteRequest tests that RequestRecorder can record a write request.
|
||||
func TestRequestRecorder_WriteRequest(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
r := RequestRecorder{
|
||||
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
|
||||
RecorderFunc: simpleRecorderFunc,
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
|
||||
r.Record("B.C", RPCTypeNetRPC, start, writeRequest{}, true)
|
||||
|
||||
expectedLabels := []metrics.Label{
|
||||
{Name: "method", Value: "B.C"},
|
||||
{Name: "errored", Value: "true"},
|
||||
{Name: "request_type", Value: "write"},
|
||||
{Name: "rpc_type", Value: RPCTypeNetRPC},
|
||||
}
|
||||
|
||||
o := store.get(append(metricRPCRequest, expectedLabels[0].Value))
|
||||
require.Equal(t, o.labels, expectedLabels)
|
||||
func (r readReqWithTD) GetMinQueryIndex() uint64 {
|
||||
return 1
|
||||
}
|
||||
func (r readReqWithTD) AllowStaleRead() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (w writeReqWithTD) IsRead() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (w writeReqWithTD) RequestDatacenter() string {
|
||||
return "dc2"
|
||||
}
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
// description is meant for human friendliness
|
||||
description string
|
||||
// requestName is encouraged to be unique across tests to
|
||||
// avoid lock contention
|
||||
requestName string
|
||||
requestI interface{}
|
||||
rpcType string
|
||||
errored bool
|
||||
isLeader func() bool
|
||||
dc string
|
||||
// the first element in expectedLabels should be the method name
|
||||
expectedLabels []metrics.Label
|
||||
}
|
||||
|
||||
var testCases = []testCase{
|
||||
{
|
||||
name: "simple ok",
|
||||
description: "This is a simple happy path test case. We check for pass through and normal request processing",
|
||||
requestName: "A.B",
|
||||
requestI: struct{}{},
|
||||
rpcType: RPCTypeInternal,
|
||||
errored: false,
|
||||
dc: "dc1",
|
||||
expectedLabels: []metrics.Label{
|
||||
{Name: "method", Value: "A.B"},
|
||||
{Name: "errored", Value: "false"},
|
||||
{Name: "request_type", Value: "unreported"},
|
||||
{Name: "rpc_type", Value: RPCTypeInternal},
|
||||
{Name: "leader", Value: "unreported"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "simple ok errored",
|
||||
description: "Checks that the errored value is populated right.",
|
||||
requestName: "A.C",
|
||||
requestI: struct{}{},
|
||||
rpcType: "test",
|
||||
errored: true,
|
||||
dc: "dc1",
|
||||
expectedLabels: []metrics.Label{
|
||||
{Name: "method", Value: "A.C"},
|
||||
{Name: "errored", Value: "true"},
|
||||
{Name: "request_type", Value: "unreported"},
|
||||
{Name: "rpc_type", Value: "test"},
|
||||
{Name: "leader", Value: "unreported"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read request, rpc type internal",
|
||||
description: "Checks for read request interface parsing",
|
||||
requestName: "B.C",
|
||||
requestI: readRequest{},
|
||||
rpcType: RPCTypeInternal,
|
||||
errored: false,
|
||||
dc: "dc1",
|
||||
expectedLabels: []metrics.Label{
|
||||
{Name: "method", Value: "B.C"},
|
||||
{Name: "errored", Value: "false"},
|
||||
{Name: "request_type", Value: "read"},
|
||||
{Name: "rpc_type", Value: RPCTypeInternal},
|
||||
{Name: "leader", Value: "unreported"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "write request, rpc type net/rpc",
|
||||
description: "Checks for write request interface, different RPC type",
|
||||
requestName: "D.E",
|
||||
requestI: writeRequest{},
|
||||
rpcType: RPCTypeNetRPC,
|
||||
errored: false,
|
||||
dc: "dc1",
|
||||
expectedLabels: []metrics.Label{
|
||||
{Name: "method", Value: "D.E"},
|
||||
{Name: "errored", Value: "false"},
|
||||
{Name: "request_type", Value: "write"},
|
||||
{Name: "rpc_type", Value: RPCTypeNetRPC},
|
||||
{Name: "leader", Value: "unreported"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "read request with blocking stale and target dc",
|
||||
description: "Checks for locality, blocking status and target dc",
|
||||
requestName: "E.F",
|
||||
requestI: readReqWithTD{},
|
||||
rpcType: RPCTypeNetRPC,
|
||||
errored: false,
|
||||
dc: "dc1",
|
||||
expectedLabels: []metrics.Label{
|
||||
{Name: "method", Value: "E.F"},
|
||||
{Name: "errored", Value: "false"},
|
||||
{Name: "request_type", Value: "read"},
|
||||
{Name: "rpc_type", Value: RPCTypeNetRPC},
|
||||
{Name: "leader", Value: "unreported"},
|
||||
{Name: "allow_stale", Value: "false"},
|
||||
{Name: "blocking", Value: "true"},
|
||||
{Name: "target_datacenter", Value: "dc3"},
|
||||
{Name: "locality", Value: "forwarded"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "write request with TD, locality local",
|
||||
description: "Checks for write request with local forwarding and target dc",
|
||||
requestName: "F.G",
|
||||
requestI: writeReqWithTD{},
|
||||
rpcType: RPCTypeNetRPC,
|
||||
errored: false,
|
||||
dc: "dc2",
|
||||
expectedLabels: []metrics.Label{
|
||||
{Name: "method", Value: "F.G"},
|
||||
{Name: "errored", Value: "false"},
|
||||
{Name: "request_type", Value: "write"},
|
||||
{Name: "rpc_type", Value: RPCTypeNetRPC},
|
||||
{Name: "leader", Value: "unreported"},
|
||||
{Name: "target_datacenter", Value: "dc2"},
|
||||
{Name: "locality", Value: "local"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "is leader",
|
||||
description: "checks for is leader",
|
||||
requestName: "G.H",
|
||||
requestI: struct{}{},
|
||||
rpcType: "test",
|
||||
errored: false,
|
||||
isLeader: func() bool {
|
||||
return true
|
||||
},
|
||||
expectedLabels: []metrics.Label{
|
||||
{Name: "method", Value: "G.H"},
|
||||
{Name: "errored", Value: "false"},
|
||||
{Name: "request_type", Value: "unreported"},
|
||||
{Name: "rpc_type", Value: "test"},
|
||||
{Name: "leader", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "is not leader",
|
||||
description: "checks for is not leader",
|
||||
requestName: "H.I",
|
||||
requestI: struct{}{},
|
||||
rpcType: "test",
|
||||
errored: false,
|
||||
isLeader: func() bool {
|
||||
return false
|
||||
},
|
||||
expectedLabels: []metrics.Label{
|
||||
{Name: "method", Value: "H.I"},
|
||||
{Name: "errored", Value: "false"},
|
||||
{Name: "request_type", Value: "unreported"},
|
||||
{Name: "rpc_type", Value: "test"},
|
||||
{Name: "leader", Value: "false"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// TestRequestRecorder goes over all the parsing and reporting that RequestRecorder
|
||||
// is expected to perform.
|
||||
func TestRequestRecorder(t *testing.T) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
r := RequestRecorder{
|
||||
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
|
||||
RecorderFunc: simpleRecorderFunc,
|
||||
serverIsLeader: tc.isLeader,
|
||||
localDC: tc.dc,
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
r.Record(tc.requestName, tc.rpcType, start, tc.requestI, tc.errored)
|
||||
|
||||
key := append(metricRPCRequest, tc.expectedLabels[0].Value)
|
||||
o := store.get(key)
|
||||
|
||||
require.Equal(t, o.key, metricRPCRequest)
|
||||
require.LessOrEqual(t, o.elapsed, float32(start.Sub(time.Now()).Milliseconds()))
|
||||
require.Equal(t, o.labels, tc.expectedLabels)
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue