mirror of
https://github.com/status-im/consul.git
synced 2025-01-13 07:14:37 +00:00
polish rpc.service.call metric behavior (#12624)
This commit is contained in:
parent
706c844423
commit
1adfd7b94c
@ -32,6 +32,7 @@ import (
|
||||
"github.com/hashicorp/consul/agent/consul"
|
||||
"github.com/hashicorp/consul/agent/consul/authmethod/ssoauth"
|
||||
"github.com/hashicorp/consul/agent/dns"
|
||||
"github.com/hashicorp/consul/agent/rpc/middleware"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/ipaddr"
|
||||
@ -640,21 +641,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
|
||||
}
|
||||
|
||||
// Parse the metric filters
|
||||
var telemetryAllowedPrefixes, telemetryBlockedPrefixes []string
|
||||
for _, rule := range c.Telemetry.PrefixFilter {
|
||||
if rule == "" {
|
||||
b.warn("Cannot have empty filter rule in prefix_filter")
|
||||
continue
|
||||
}
|
||||
switch rule[0] {
|
||||
case '+':
|
||||
telemetryAllowedPrefixes = append(telemetryAllowedPrefixes, rule[1:])
|
||||
case '-':
|
||||
telemetryBlockedPrefixes = append(telemetryBlockedPrefixes, rule[1:])
|
||||
default:
|
||||
b.warn("Filter rule must begin with either '+' or '-': %q", rule)
|
||||
}
|
||||
}
|
||||
telemetryAllowedPrefixes, telemetryBlockedPrefixes := b.parsePrefixFilter(&c.Telemetry)
|
||||
|
||||
// raft performance scaling
|
||||
performanceRaftMultiplier := intVal(c.Performance.RaftMultiplier)
|
||||
@ -2588,3 +2575,38 @@ func (b *builder) buildTLSConfig(rt RuntimeConfig, t TLS) (tlsutil.Config, error
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (b *builder) parsePrefixFilter(telemetry *Telemetry) ([]string, []string) {
|
||||
var telemetryAllowedPrefixes, telemetryBlockedPrefixes []string
|
||||
|
||||
// TODO(FFMMM): Once one twelve style RPC metrics get out of Beta, don't remove them by default.
|
||||
operatorPassedOneTwelveRPCMetric := false
|
||||
oneTwelveRPCMetric := *telemetry.MetricsPrefix + "." + strings.Join(middleware.OneTwelveRPCSummary[0].Name, ".")
|
||||
|
||||
for _, rule := range telemetry.PrefixFilter {
|
||||
if rule == "" {
|
||||
b.warn("Cannot have empty filter rule in prefix_filter")
|
||||
continue
|
||||
}
|
||||
switch rule[0] {
|
||||
case '+':
|
||||
if rule[1:] == oneTwelveRPCMetric {
|
||||
operatorPassedOneTwelveRPCMetric = true
|
||||
}
|
||||
telemetryAllowedPrefixes = append(telemetryAllowedPrefixes, rule[1:])
|
||||
case '-':
|
||||
if rule[1:] == oneTwelveRPCMetric {
|
||||
operatorPassedOneTwelveRPCMetric = true
|
||||
}
|
||||
telemetryBlockedPrefixes = append(telemetryBlockedPrefixes, rule[1:])
|
||||
default:
|
||||
b.warn("Filter rule must begin with either '+' or '-': %q", rule)
|
||||
}
|
||||
}
|
||||
|
||||
if !operatorPassedOneTwelveRPCMetric {
|
||||
telemetryBlockedPrefixes = append(telemetryBlockedPrefixes, oneTwelveRPCMetric)
|
||||
}
|
||||
|
||||
return telemetryAllowedPrefixes, telemetryBlockedPrefixes
|
||||
}
|
||||
|
@ -384,3 +384,54 @@ func TestBuilder_tlsCipherSuites(t *testing.T) {
|
||||
require.Contains(t, b.err.Error(), invalidCipherSuites)
|
||||
require.Contains(t, b.err.Error(), "cipher suites are not configurable")
|
||||
}
|
||||
|
||||
func TestBuilder_parsePrefixFilter(t *testing.T) {
|
||||
t.Run("Check that 1.12 rpc metrics are parsed correctly.", func(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
metricsPrefix string
|
||||
prefixFilter []string
|
||||
expectedAllowedPrefix []string
|
||||
expectedBlockedPrefix []string
|
||||
}
|
||||
|
||||
var testCases = []testCase{
|
||||
{
|
||||
name: "no prefix filter",
|
||||
metricsPrefix: "somePrefix",
|
||||
prefixFilter: []string{},
|
||||
expectedAllowedPrefix: nil,
|
||||
expectedBlockedPrefix: []string{"somePrefix.rpc.server.call"},
|
||||
},
|
||||
{
|
||||
name: "operator enables 1.12 rpc metrics",
|
||||
metricsPrefix: "somePrefix",
|
||||
prefixFilter: []string{"+somePrefix.rpc.server.call"},
|
||||
expectedAllowedPrefix: []string{"somePrefix.rpc.server.call"},
|
||||
expectedBlockedPrefix: nil,
|
||||
},
|
||||
{
|
||||
name: "operator enables 1.12 rpc metrics",
|
||||
metricsPrefix: "somePrefix",
|
||||
prefixFilter: []string{"-somePrefix.rpc.server.call"},
|
||||
expectedAllowedPrefix: nil,
|
||||
expectedBlockedPrefix: []string{"somePrefix.rpc.server.call"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
b := builder{}
|
||||
telemetry := &Telemetry{
|
||||
MetricsPrefix: &tc.metricsPrefix,
|
||||
PrefixFilter: tc.prefixFilter,
|
||||
}
|
||||
|
||||
allowedPrefix, blockedPrefix := b.parsePrefixFilter(telemetry)
|
||||
|
||||
require.Equal(t, tc.expectedAllowedPrefix, allowedPrefix)
|
||||
require.Equal(t, tc.expectedBlockedPrefix, blockedPrefix)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -2326,7 +2326,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) {
|
||||
expected: func(rt *RuntimeConfig) {
|
||||
rt.DataDir = dataDir
|
||||
rt.Telemetry.AllowedPrefixes = []string{"foo"}
|
||||
rt.Telemetry.BlockedPrefixes = []string{"bar"}
|
||||
rt.Telemetry.BlockedPrefixes = []string{"bar", "consul.rpc.server.call"}
|
||||
},
|
||||
expectedWarnings: []string{`Filter rule must begin with either '+' or '-': "nix"`},
|
||||
})
|
||||
@ -6285,7 +6285,7 @@ func TestLoad_FullConfig(t *testing.T) {
|
||||
DogstatsdTags: []string{"3N81zSUB", "Xtj8AnXZ"},
|
||||
FilterDefault: true,
|
||||
AllowedPrefixes: []string{"oJotS8XJ"},
|
||||
BlockedPrefixes: []string{"cazlEhGn"},
|
||||
BlockedPrefixes: []string{"cazlEhGn", "ftO6DySn.rpc.server.call"},
|
||||
MetricsPrefix: "ftO6DySn",
|
||||
StatsdAddr: "drce87cy",
|
||||
StatsiteAddr: "HpFwKB8R",
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/rpc/middleware"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/consul/tlsutil"
|
||||
@ -43,6 +44,82 @@ func assertMetricExists(t *testing.T, respRec *httptest.ResponseRecorder, metric
|
||||
}
|
||||
}
|
||||
|
||||
// assertMetricExistsWithLabels looks in the prometheus metrics reponse for the metric name and all the labels. eg:
|
||||
// new_rpc_metrics_rpc_server_call{errored="false",method="Status.Ping",request_type="unknown",rpc_type="net/rpc"}
|
||||
func assertMetricExistsWithLabels(t *testing.T, respRec *httptest.ResponseRecorder, metric string, labelNames []string) {
|
||||
if respRec.Body.String() == "" {
|
||||
t.Fatalf("Response body is empty.")
|
||||
}
|
||||
|
||||
if !strings.Contains(respRec.Body.String(), metric) {
|
||||
t.Fatalf("Could not find the metric \"%s\" in the /v1/agent/metrics response", metric)
|
||||
}
|
||||
|
||||
foundAllLabels := false
|
||||
metrics := respRec.Body.String()
|
||||
for _, line := range strings.Split(metrics, "\n") {
|
||||
// skip help lines
|
||||
if len(line) < 1 || line[0] == '#' {
|
||||
continue
|
||||
}
|
||||
|
||||
if strings.Contains(line, metric) {
|
||||
hasAllLabels := true
|
||||
for _, labelName := range labelNames {
|
||||
if !strings.Contains(line, labelName) {
|
||||
hasAllLabels = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if hasAllLabels {
|
||||
foundAllLabels = true
|
||||
|
||||
// done!
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !foundAllLabels {
|
||||
t.Fatalf("Could not verify that all named labels \"%s\" exist for the metric \"%s\" in the /v1/agent/metrics response", strings.Join(labelNames, ", "), metric)
|
||||
}
|
||||
}
|
||||
|
||||
func assertLabelWithValueForMetricExistsNTime(t *testing.T, respRec *httptest.ResponseRecorder, metric string, label string, labelValue string, occurrences int) {
|
||||
if respRec.Body.String() == "" {
|
||||
t.Fatalf("Response body is empty.")
|
||||
}
|
||||
|
||||
if !strings.Contains(respRec.Body.String(), metric) {
|
||||
t.Fatalf("Could not find the metric \"%s\" in the /v1/agent/metrics response", metric)
|
||||
}
|
||||
|
||||
metrics := respRec.Body.String()
|
||||
// don't look at _sum or _count or other aggregates
|
||||
metricTarget := metric + "{"
|
||||
// eg method="Status.Ping"
|
||||
labelWithValueTarget := label + "=" + "\"" + labelValue + "\""
|
||||
|
||||
matchesFound := 0
|
||||
for _, line := range strings.Split(metrics, "\n") {
|
||||
// skip help lines
|
||||
if len(line) < 1 || line[0] == '#' {
|
||||
continue
|
||||
}
|
||||
|
||||
if strings.Contains(line, metricTarget) {
|
||||
if strings.Contains(line, labelWithValueTarget) {
|
||||
matchesFound++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if matchesFound < occurrences {
|
||||
t.Fatalf("Only found metric \"%s\" %d times. Wanted %d times.", metric, matchesFound, occurrences)
|
||||
}
|
||||
}
|
||||
|
||||
func assertMetricExistsWithValue(t *testing.T, respRec *httptest.ResponseRecorder, metric string, value string) {
|
||||
if respRec.Body.String() == "" {
|
||||
t.Fatalf("Response body is empty.")
|
||||
@ -66,13 +143,13 @@ func assertMetricNotExists(t *testing.T, respRec *httptest.ResponseRecorder, met
|
||||
}
|
||||
}
|
||||
|
||||
// TestAgent_NewRPCMetrics test for the new RPC metrics. These are the labeled metrics coming from
|
||||
// TestAgent_OneTwelveRPCMetrics test for the 1.12 style RPC metrics. These are the labeled metrics coming from
|
||||
// agent.rpc.middleware.interceptors package.
|
||||
func TestAgent_NewRPCMetrics(t *testing.T) {
|
||||
func TestAgent_OneTwelveRPCMetrics(t *testing.T) {
|
||||
skipIfShortTesting(t)
|
||||
// This test cannot use t.Parallel() since we modify global state, ie the global metrics instance
|
||||
|
||||
t.Run("Check new rpc metrics are being emitted", func(t *testing.T) {
|
||||
t.Run("Check that 1.12 rpc metrics are not emitted by default.", func(t *testing.T) {
|
||||
metricsPrefix := "new_rpc_metrics"
|
||||
hcl := fmt.Sprintf(`
|
||||
telemetry = {
|
||||
@ -92,7 +169,39 @@ func TestAgent_NewRPCMetrics(t *testing.T) {
|
||||
respRec := httptest.NewRecorder()
|
||||
recordPromMetrics(t, a, respRec)
|
||||
|
||||
assertMetricExists(t, respRec, metricsPrefix+"_rpc_server_call")
|
||||
assertMetricNotExists(t, respRec, metricsPrefix+"_rpc_server_call")
|
||||
})
|
||||
|
||||
t.Run("Check that 1.12 rpc metrics are emitted when specified by operator.", func(t *testing.T) {
|
||||
metricsPrefix := "new_rpc_metrics_2"
|
||||
allowRPCMetricRule := metricsPrefix + "." + strings.Join(middleware.OneTwelveRPCSummary[0].Name, ".")
|
||||
hcl := fmt.Sprintf(`
|
||||
telemetry = {
|
||||
prometheus_retention_time = "5s"
|
||||
disable_hostname = true
|
||||
metrics_prefix = "%s"
|
||||
prefix_filter = ["+%s"]
|
||||
}
|
||||
`, metricsPrefix, allowRPCMetricRule)
|
||||
|
||||
a := StartTestAgent(t, TestAgent{HCL: hcl})
|
||||
defer a.Shutdown()
|
||||
|
||||
var out struct{}
|
||||
err := a.RPC("Status.Ping", struct{}{}, &out)
|
||||
require.NoError(t, err)
|
||||
err = a.RPC("Status.Ping", struct{}{}, &out)
|
||||
require.NoError(t, err)
|
||||
err = a.RPC("Status.Ping", struct{}{}, &out)
|
||||
require.NoError(t, err)
|
||||
|
||||
respRec := httptest.NewRecorder()
|
||||
recordPromMetrics(t, a, respRec)
|
||||
|
||||
// make sure the labels exist for this metric
|
||||
assertMetricExistsWithLabels(t, respRec, metricsPrefix+"_rpc_server_call", []string{"errored", "method", "request_type", "rpc_type"})
|
||||
// make sure we see 3 Status.Ping metrics corresponding to the calls we made above
|
||||
assertLabelWithValueForMetricExistsNTime(t, respRec, metricsPrefix+"_rpc_server_call", "method", "Status.Ping", 3)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -3,6 +3,7 @@ package middleware
|
||||
import (
|
||||
"reflect"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
@ -22,27 +23,26 @@ const RPCTypeInternal = "internal"
|
||||
const RPCTypeNetRPC = "net/rpc"
|
||||
|
||||
var metricRPCRequest = []string{"rpc", "server", "call"}
|
||||
var requestLogName = "rpc.server.request"
|
||||
var requestLogName = strings.Join(metricRPCRequest, "_")
|
||||
|
||||
var NewRPCGauges = []prometheus.GaugeDefinition{
|
||||
var OneTwelveRPCSummary = []prometheus.SummaryDefinition{
|
||||
{
|
||||
Name: metricRPCRequest,
|
||||
Help: "Increments when a server makes an RPC service call. The labels on the metric have more information",
|
||||
Help: "Measures the time an RPC service call takes to make in milliseconds. Labels mark which RPC method was called and metadata about the call.",
|
||||
},
|
||||
}
|
||||
|
||||
type RequestRecorder struct {
|
||||
Logger hclog.Logger
|
||||
recorderFunc func(key []string, start time.Time, labels []metrics.Label)
|
||||
recorderFunc func(key []string, val float32, labels []metrics.Label)
|
||||
}
|
||||
|
||||
func NewRequestRecorder(logger hclog.Logger) *RequestRecorder {
|
||||
return &RequestRecorder{Logger: logger, recorderFunc: metrics.MeasureSinceWithLabels}
|
||||
return &RequestRecorder{Logger: logger, recorderFunc: metrics.AddSampleWithLabels}
|
||||
}
|
||||
|
||||
func (r *RequestRecorder) Record(requestName string, rpcType string, start time.Time, request interface{}, respErrored bool) {
|
||||
elapsed := time.Since(start)
|
||||
|
||||
elapsed := time.Since(start).Milliseconds()
|
||||
reqType := requestType(request)
|
||||
|
||||
labels := []metrics.Label{
|
||||
@ -52,9 +52,8 @@ func (r *RequestRecorder) Record(requestName string, rpcType string, start time.
|
||||
{Name: "rpc_type", Value: rpcType},
|
||||
}
|
||||
|
||||
// TODO(FFMMM): it'd be neat if we could actually pass the elapsed observed above
|
||||
r.recorderFunc(metricRPCRequest, start, labels)
|
||||
|
||||
// math.MaxInt64 < math.MaxFloat32 is true so we should be good!
|
||||
r.recorderFunc(metricRPCRequest, float32(elapsed), labels)
|
||||
r.Logger.Debug(requestLogName,
|
||||
"method", requestName,
|
||||
"errored", respErrored,
|
||||
@ -64,10 +63,18 @@ func (r *RequestRecorder) Record(requestName string, rpcType string, start time.
|
||||
}
|
||||
|
||||
func requestType(req interface{}) string {
|
||||
if r, ok := req.(interface{ IsRead() bool }); ok && r.IsRead() {
|
||||
return "read"
|
||||
if r, ok := req.(interface{ IsRead() bool }); ok {
|
||||
if r.IsRead() {
|
||||
return "read"
|
||||
} else {
|
||||
return "write"
|
||||
}
|
||||
}
|
||||
return "write"
|
||||
|
||||
// This logical branch should not happen. If it happens
|
||||
// it means an underlying request is not implementing the interface.
|
||||
// Rather than swallowing it up in a "read" or "write", let's be aware of it.
|
||||
return "unreported"
|
||||
}
|
||||
|
||||
func GetNetRPCInterceptor(recorder *RequestRecorder) rpc.ServerServiceCallInterceptor {
|
||||
|
@ -13,9 +13,9 @@ import (
|
||||
|
||||
// obs holds all the things we want to assert on that we recorded correctly in our tests.
|
||||
type obs struct {
|
||||
key []string
|
||||
start time.Time
|
||||
labels []metrics.Label
|
||||
key []string
|
||||
elapsed float32
|
||||
labels []metrics.Label
|
||||
}
|
||||
|
||||
// recorderStore acts as an in-mem mock storage for all the RequestRecorder.Record() recorderFunc calls.
|
||||
@ -41,9 +41,8 @@ func (rs *recorderStore) get(key []string) obs {
|
||||
}
|
||||
|
||||
var store = recorderStore{store: make(map[string]obs)}
|
||||
var simpleRecorderFunc = func(key []string, start time.Time, labels []metrics.Label) {
|
||||
o := obs{key: key, start: start, labels: labels}
|
||||
|
||||
var simpleRecorderFunc = func(key []string, val float32, labels []metrics.Label) {
|
||||
o := obs{key: key, elapsed: val, labels: labels}
|
||||
store.put(key, o)
|
||||
}
|
||||
|
||||
@ -71,13 +70,13 @@ func TestRequestRecorder_SimpleOK(t *testing.T) {
|
||||
expectedLabels := []metrics.Label{
|
||||
{Name: "method", Value: "A.B"},
|
||||
{Name: "errored", Value: "false"},
|
||||
{Name: "request_type", Value: "write"},
|
||||
{Name: "request_type", Value: "unreported"},
|
||||
{Name: "rpc_type", Value: RPCTypeInternal},
|
||||
}
|
||||
|
||||
o := store.get(append(metricRPCRequest, expectedLabels[0].Value))
|
||||
require.Equal(t, o.key, metricRPCRequest)
|
||||
require.Equal(t, o.start, start)
|
||||
require.LessOrEqual(t, o.elapsed, float32(start.Sub(time.Now()).Milliseconds()))
|
||||
require.Equal(t, o.labels, expectedLabels)
|
||||
}
|
||||
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
"github.com/hashicorp/consul/agent/local"
|
||||
"github.com/hashicorp/consul/agent/pool"
|
||||
"github.com/hashicorp/consul/agent/router"
|
||||
"github.com/hashicorp/consul/agent/rpc/middleware"
|
||||
"github.com/hashicorp/consul/agent/submatview"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/agent/xds"
|
||||
@ -215,7 +214,6 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau
|
||||
CertExpirationGauges,
|
||||
Gauges,
|
||||
raftGauges,
|
||||
middleware.NewRPCGauges,
|
||||
}
|
||||
|
||||
// TODO(ffmmm): conditionally add only leader specific metrics to gauges, counters, summaries, etc
|
||||
|
Loading…
x
Reference in New Issue
Block a user