mirror of https://github.com/status-im/consul.git
factor out recording func, add unit tests (#12585)
Signed-off-by: FFMMM <FFMMM@users.noreply.github.com>
This commit is contained in:
parent
7298967070
commit
a7e5ee005a
|
@ -19,6 +19,7 @@ import (
|
||||||
// Really what we are measuring here is a "cluster operation". The term we have
|
// Really what we are measuring here is a "cluster operation". The term we have
|
||||||
// used for this historically is "RPC", so we continue to use that here.
|
// used for this historically is "RPC", so we continue to use that here.
|
||||||
const RPCTypeInternal = "internal"
|
const RPCTypeInternal = "internal"
|
||||||
|
const RPCTypeNetRPC = "net/rpc"
|
||||||
|
|
||||||
var metricRPCRequest = []string{"rpc", "server", "call"}
|
var metricRPCRequest = []string{"rpc", "server", "call"}
|
||||||
var requestLogName = "rpc.server.request"
|
var requestLogName = "rpc.server.request"
|
||||||
|
@ -31,11 +32,12 @@ var NewRPCCounters = []prometheus.CounterDefinition{
|
||||||
}
|
}
|
||||||
|
|
||||||
type RequestRecorder struct {
|
type RequestRecorder struct {
|
||||||
Logger hclog.Logger
|
Logger hclog.Logger
|
||||||
|
recorderFunc func(key []string, start time.Time, labels []metrics.Label)
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewRequestRecorder(logger hclog.Logger) *RequestRecorder {
|
func NewRequestRecorder(logger hclog.Logger) *RequestRecorder {
|
||||||
return &RequestRecorder{Logger: logger}
|
return &RequestRecorder{Logger: logger, recorderFunc: metrics.MeasureSinceWithLabels}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *RequestRecorder) Record(requestName string, rpcType string, start time.Time, request interface{}, respErrored bool) {
|
func (r *RequestRecorder) Record(requestName string, rpcType string, start time.Time, request interface{}, respErrored bool) {
|
||||||
|
@ -50,9 +52,8 @@ func (r *RequestRecorder) Record(requestName string, rpcType string, start time.
|
||||||
{Name: "rpc_type", Value: rpcType},
|
{Name: "rpc_type", Value: rpcType},
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(rpc-metrics-improv): consider using Telemetry API call here
|
// TODO(FFMMM): it'd be neat if we could actually pass the elapsed observed above
|
||||||
// It'd be neat if we could actually pass the elapsed observed above
|
r.recorderFunc(metricRPCRequest, start, labels)
|
||||||
metrics.MeasureSinceWithLabels(metricRPCRequest, start, labels)
|
|
||||||
|
|
||||||
r.Logger.Debug(requestLogName,
|
r.Logger.Debug(requestLogName,
|
||||||
"method", requestName,
|
"method", requestName,
|
||||||
|
@ -75,11 +76,6 @@ func GetNetRPCInterceptor(recorder *RequestRecorder) rpc.ServerServiceCallInterc
|
||||||
|
|
||||||
err := handler()
|
err := handler()
|
||||||
|
|
||||||
responseErr := false
|
recorder.Record(reqServiceMethod, RPCTypeNetRPC, reqStart, argv.Interface(), err != nil)
|
||||||
if err != nil {
|
|
||||||
responseErr = true
|
|
||||||
}
|
|
||||||
|
|
||||||
recorder.Record(reqServiceMethod, "net/rpc", reqStart, argv.Interface(), responseErr)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,126 @@
|
||||||
|
package middleware
|
||||||
|
|
||||||
|
import (
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/armon/go-metrics"
|
||||||
|
"github.com/hashicorp/go-hclog"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
// obs holds all the things we want to assert on that we recorded correctly in our tests.
|
||||||
|
type obs struct {
|
||||||
|
key []string
|
||||||
|
start time.Time
|
||||||
|
labels []metrics.Label
|
||||||
|
}
|
||||||
|
|
||||||
|
// recorderStore acts as an in-mem mock storage for all the RequestRecorder.Record() recorderFunc calls.
|
||||||
|
type recorderStore struct {
|
||||||
|
lock sync.Mutex
|
||||||
|
store map[string]obs
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *recorderStore) put(key []string, o obs) {
|
||||||
|
rs.lock.Lock()
|
||||||
|
defer rs.lock.Unlock()
|
||||||
|
|
||||||
|
actualKey := strings.Join(append(key, o.labels[0].Value), "")
|
||||||
|
rs.store[actualKey] = o
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rs *recorderStore) get(key []string) obs {
|
||||||
|
rs.lock.Lock()
|
||||||
|
defer rs.lock.Unlock()
|
||||||
|
|
||||||
|
actualKey := strings.Join(key, "")
|
||||||
|
return rs.store[actualKey]
|
||||||
|
}
|
||||||
|
|
||||||
|
var store = recorderStore{store: make(map[string]obs)}
|
||||||
|
var simpleRecorderFunc = func(key []string, start time.Time, labels []metrics.Label) {
|
||||||
|
o := obs{key: key, start: start, labels: labels}
|
||||||
|
|
||||||
|
store.put(key, o)
|
||||||
|
}
|
||||||
|
|
||||||
|
type readRequest struct{}
|
||||||
|
type writeRequest struct{}
|
||||||
|
|
||||||
|
func (rr readRequest) IsRead() bool {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (wr writeRequest) IsRead() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRequestRecorder_SimpleOK tests that the RequestRecorder can record a simple request.
|
||||||
|
func TestRequestRecorder_SimpleOK(t *testing.T) {
|
||||||
|
r := RequestRecorder{
|
||||||
|
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
|
||||||
|
recorderFunc: simpleRecorderFunc,
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
r.Record("A.B", RPCTypeInternal, start, struct{}{}, false)
|
||||||
|
|
||||||
|
expectedLabels := []metrics.Label{
|
||||||
|
{Name: "method", Value: "A.B"},
|
||||||
|
{Name: "errored", Value: "false"},
|
||||||
|
{Name: "request_type", Value: "write"},
|
||||||
|
{Name: "rpc_type", Value: RPCTypeInternal},
|
||||||
|
}
|
||||||
|
|
||||||
|
o := store.get(append(metricRPCRequest, expectedLabels[0].Value))
|
||||||
|
require.Equal(t, o.key, metricRPCRequest)
|
||||||
|
require.Equal(t, o.start, start)
|
||||||
|
require.Equal(t, o.labels, expectedLabels)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRequestRecorder_ReadRequest tests that RequestRecorder can record a read request AND a responseErrored arg.
|
||||||
|
func TestRequestRecorder_ReadRequest(t *testing.T) {
|
||||||
|
r := RequestRecorder{
|
||||||
|
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
|
||||||
|
recorderFunc: simpleRecorderFunc,
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
r.Record("B.A", RPCTypeNetRPC, start, readRequest{}, true)
|
||||||
|
|
||||||
|
expectedLabels := []metrics.Label{
|
||||||
|
{Name: "method", Value: "B.A"},
|
||||||
|
{Name: "errored", Value: "true"},
|
||||||
|
{Name: "request_type", Value: "read"},
|
||||||
|
{Name: "rpc_type", Value: RPCTypeNetRPC},
|
||||||
|
}
|
||||||
|
|
||||||
|
o := store.get(append(metricRPCRequest, expectedLabels[0].Value))
|
||||||
|
require.Equal(t, o.labels, expectedLabels)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestRequestRecorder_WriteRequest tests that RequestRecorder can record a write request.
|
||||||
|
func TestRequestRecorder_WriteRequest(t *testing.T) {
|
||||||
|
r := RequestRecorder{
|
||||||
|
Logger: hclog.NewInterceptLogger(&hclog.LoggerOptions{}),
|
||||||
|
recorderFunc: simpleRecorderFunc,
|
||||||
|
}
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
r.Record("B.C", RPCTypeNetRPC, start, writeRequest{}, true)
|
||||||
|
|
||||||
|
expectedLabels := []metrics.Label{
|
||||||
|
{Name: "method", Value: "B.C"},
|
||||||
|
{Name: "errored", Value: "true"},
|
||||||
|
{Name: "request_type", Value: "write"},
|
||||||
|
{Name: "rpc_type", Value: RPCTypeNetRPC},
|
||||||
|
}
|
||||||
|
|
||||||
|
o := store.get(append(metricRPCRequest, expectedLabels[0].Value))
|
||||||
|
require.Equal(t, o.labels, expectedLabels)
|
||||||
|
}
|
Loading…
Reference in New Issue