diff --git a/agent/config/builder.go b/agent/config/builder.go index d9686254b4..d5d3bbfb4b 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -32,6 +32,7 @@ import ( "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/consul/authmethod/ssoauth" "github.com/hashicorp/consul/agent/dns" + "github.com/hashicorp/consul/agent/rpc/middleware" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/ipaddr" @@ -640,21 +641,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) { } // Parse the metric filters - var telemetryAllowedPrefixes, telemetryBlockedPrefixes []string - for _, rule := range c.Telemetry.PrefixFilter { - if rule == "" { - b.warn("Cannot have empty filter rule in prefix_filter") - continue - } - switch rule[0] { - case '+': - telemetryAllowedPrefixes = append(telemetryAllowedPrefixes, rule[1:]) - case '-': - telemetryBlockedPrefixes = append(telemetryBlockedPrefixes, rule[1:]) - default: - b.warn("Filter rule must begin with either '+' or '-': %q", rule) - } - } + telemetryAllowedPrefixes, telemetryBlockedPrefixes := b.parsePrefixFilter(&c.Telemetry) // raft performance scaling performanceRaftMultiplier := intVal(c.Performance.RaftMultiplier) @@ -2588,3 +2575,38 @@ func (b *builder) buildTLSConfig(rt RuntimeConfig, t TLS) (tlsutil.Config, error return c, nil } + +func (b *builder) parsePrefixFilter(telemetry *Telemetry) ([]string, []string) { + var telemetryAllowedPrefixes, telemetryBlockedPrefixes []string + + // TODO(FFMMM): Once one twelve style RPC metrics get out of Beta, don't remove them by default. + operatorPassedOneTwelveRPCMetric := false + oneTwelveRPCMetric := *telemetry.MetricsPrefix + "." + strings.Join(middleware.OneTwelveRPCSummary[0].Name, ".") + + for _, rule := range telemetry.PrefixFilter { + if rule == "" { + b.warn("Cannot have empty filter rule in prefix_filter") + continue + } + switch rule[0] { + case '+': + if rule[1:] == oneTwelveRPCMetric { + operatorPassedOneTwelveRPCMetric = true + } + telemetryAllowedPrefixes = append(telemetryAllowedPrefixes, rule[1:]) + case '-': + if rule[1:] == oneTwelveRPCMetric { + operatorPassedOneTwelveRPCMetric = true + } + telemetryBlockedPrefixes = append(telemetryBlockedPrefixes, rule[1:]) + default: + b.warn("Filter rule must begin with either '+' or '-': %q", rule) + } + } + + if !operatorPassedOneTwelveRPCMetric { + telemetryBlockedPrefixes = append(telemetryBlockedPrefixes, oneTwelveRPCMetric) + } + + return telemetryAllowedPrefixes, telemetryBlockedPrefixes +} diff --git a/agent/config/builder_test.go b/agent/config/builder_test.go index 58fd922fc2..1bd6d8653f 100644 --- a/agent/config/builder_test.go +++ b/agent/config/builder_test.go @@ -384,3 +384,54 @@ func TestBuilder_tlsCipherSuites(t *testing.T) { require.Contains(t, b.err.Error(), invalidCipherSuites) require.Contains(t, b.err.Error(), "cipher suites are not configurable") } + +func TestBuilder_parsePrefixFilter(t *testing.T) { + t.Run("Check that 1.12 rpc metrics are parsed correctly.", func(t *testing.T) { + type testCase struct { + name string + metricsPrefix string + prefixFilter []string + expectedAllowedPrefix []string + expectedBlockedPrefix []string + } + + var testCases = []testCase{ + { + name: "no prefix filter", + metricsPrefix: "somePrefix", + prefixFilter: []string{}, + expectedAllowedPrefix: nil, + expectedBlockedPrefix: []string{"somePrefix.rpc.server.call"}, + }, + { + name: "operator enables 1.12 rpc metrics", + metricsPrefix: "somePrefix", + prefixFilter: []string{"+somePrefix.rpc.server.call"}, + expectedAllowedPrefix: []string{"somePrefix.rpc.server.call"}, + expectedBlockedPrefix: nil, + }, + { + name: "operator enables 1.12 rpc metrics", + metricsPrefix: "somePrefix", + prefixFilter: []string{"-somePrefix.rpc.server.call"}, + expectedAllowedPrefix: nil, + expectedBlockedPrefix: []string{"somePrefix.rpc.server.call"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + b := builder{} + telemetry := &Telemetry{ + MetricsPrefix: &tc.metricsPrefix, + PrefixFilter: tc.prefixFilter, + } + + allowedPrefix, blockedPrefix := b.parsePrefixFilter(telemetry) + + require.Equal(t, tc.expectedAllowedPrefix, allowedPrefix) + require.Equal(t, tc.expectedBlockedPrefix, blockedPrefix) + }) + } + }) +} diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index d74650c07e..ab0798342b 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -2326,7 +2326,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) { expected: func(rt *RuntimeConfig) { rt.DataDir = dataDir rt.Telemetry.AllowedPrefixes = []string{"foo"} - rt.Telemetry.BlockedPrefixes = []string{"bar"} + rt.Telemetry.BlockedPrefixes = []string{"bar", "consul.rpc.server.call"} }, expectedWarnings: []string{`Filter rule must begin with either '+' or '-': "nix"`}, }) @@ -6285,7 +6285,7 @@ func TestLoad_FullConfig(t *testing.T) { DogstatsdTags: []string{"3N81zSUB", "Xtj8AnXZ"}, FilterDefault: true, AllowedPrefixes: []string{"oJotS8XJ"}, - BlockedPrefixes: []string{"cazlEhGn"}, + BlockedPrefixes: []string{"cazlEhGn", "ftO6DySn.rpc.server.call"}, MetricsPrefix: "ftO6DySn", StatsdAddr: "drce87cy", StatsiteAddr: "HpFwKB8R", diff --git a/agent/metrics_test.go b/agent/metrics_test.go index b530eda250..5bbc7de23c 100644 --- a/agent/metrics_test.go +++ b/agent/metrics_test.go @@ -10,6 +10,7 @@ import ( "strings" "testing" + "github.com/hashicorp/consul/agent/rpc/middleware" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/tlsutil" @@ -43,6 +44,82 @@ func assertMetricExists(t *testing.T, respRec *httptest.ResponseRecorder, metric } } +// assertMetricExistsWithLabels looks in the prometheus metrics reponse for the metric name and all the labels. eg: +// new_rpc_metrics_rpc_server_call{errored="false",method="Status.Ping",request_type="unknown",rpc_type="net/rpc"} +func assertMetricExistsWithLabels(t *testing.T, respRec *httptest.ResponseRecorder, metric string, labelNames []string) { + if respRec.Body.String() == "" { + t.Fatalf("Response body is empty.") + } + + if !strings.Contains(respRec.Body.String(), metric) { + t.Fatalf("Could not find the metric \"%s\" in the /v1/agent/metrics response", metric) + } + + foundAllLabels := false + metrics := respRec.Body.String() + for _, line := range strings.Split(metrics, "\n") { + // skip help lines + if len(line) < 1 || line[0] == '#' { + continue + } + + if strings.Contains(line, metric) { + hasAllLabels := true + for _, labelName := range labelNames { + if !strings.Contains(line, labelName) { + hasAllLabels = false + break + } + } + + if hasAllLabels { + foundAllLabels = true + + // done! + break + } + } + } + + if !foundAllLabels { + t.Fatalf("Could not verify that all named labels \"%s\" exist for the metric \"%s\" in the /v1/agent/metrics response", strings.Join(labelNames, ", "), metric) + } +} + +func assertLabelWithValueForMetricExistsNTime(t *testing.T, respRec *httptest.ResponseRecorder, metric string, label string, labelValue string, occurrences int) { + if respRec.Body.String() == "" { + t.Fatalf("Response body is empty.") + } + + if !strings.Contains(respRec.Body.String(), metric) { + t.Fatalf("Could not find the metric \"%s\" in the /v1/agent/metrics response", metric) + } + + metrics := respRec.Body.String() + // don't look at _sum or _count or other aggregates + metricTarget := metric + "{" + // eg method="Status.Ping" + labelWithValueTarget := label + "=" + "\"" + labelValue + "\"" + + matchesFound := 0 + for _, line := range strings.Split(metrics, "\n") { + // skip help lines + if len(line) < 1 || line[0] == '#' { + continue + } + + if strings.Contains(line, metricTarget) { + if strings.Contains(line, labelWithValueTarget) { + matchesFound++ + } + } + } + + if matchesFound < occurrences { + t.Fatalf("Only found metric \"%s\" %d times. Wanted %d times.", metric, matchesFound, occurrences) + } +} + func assertMetricExistsWithValue(t *testing.T, respRec *httptest.ResponseRecorder, metric string, value string) { if respRec.Body.String() == "" { t.Fatalf("Response body is empty.") @@ -66,13 +143,13 @@ func assertMetricNotExists(t *testing.T, respRec *httptest.ResponseRecorder, met } } -// TestAgent_NewRPCMetrics test for the new RPC metrics. These are the labeled metrics coming from +// TestAgent_OneTwelveRPCMetrics test for the 1.12 style RPC metrics. These are the labeled metrics coming from // agent.rpc.middleware.interceptors package. -func TestAgent_NewRPCMetrics(t *testing.T) { +func TestAgent_OneTwelveRPCMetrics(t *testing.T) { skipIfShortTesting(t) // This test cannot use t.Parallel() since we modify global state, ie the global metrics instance - t.Run("Check new rpc metrics are being emitted", func(t *testing.T) { + t.Run("Check that 1.12 rpc metrics are not emitted by default.", func(t *testing.T) { metricsPrefix := "new_rpc_metrics" hcl := fmt.Sprintf(` telemetry = { @@ -92,7 +169,39 @@ func TestAgent_NewRPCMetrics(t *testing.T) { respRec := httptest.NewRecorder() recordPromMetrics(t, a, respRec) - assertMetricExists(t, respRec, metricsPrefix+"_rpc_server_call") + assertMetricNotExists(t, respRec, metricsPrefix+"_rpc_server_call") + }) + + t.Run("Check that 1.12 rpc metrics are emitted when specified by operator.", func(t *testing.T) { + metricsPrefix := "new_rpc_metrics_2" + allowRPCMetricRule := metricsPrefix + "." + strings.Join(middleware.OneTwelveRPCSummary[0].Name, ".") + hcl := fmt.Sprintf(` + telemetry = { + prometheus_retention_time = "5s" + disable_hostname = true + metrics_prefix = "%s" + prefix_filter = ["+%s"] + } + `, metricsPrefix, allowRPCMetricRule) + + a := StartTestAgent(t, TestAgent{HCL: hcl}) + defer a.Shutdown() + + var out struct{} + err := a.RPC("Status.Ping", struct{}{}, &out) + require.NoError(t, err) + err = a.RPC("Status.Ping", struct{}{}, &out) + require.NoError(t, err) + err = a.RPC("Status.Ping", struct{}{}, &out) + require.NoError(t, err) + + respRec := httptest.NewRecorder() + recordPromMetrics(t, a, respRec) + + // make sure the labels exist for this metric + assertMetricExistsWithLabels(t, respRec, metricsPrefix+"_rpc_server_call", []string{"errored", "method", "request_type", "rpc_type"}) + // make sure we see 3 Status.Ping metrics corresponding to the calls we made above + assertLabelWithValueForMetricExistsNTime(t, respRec, metricsPrefix+"_rpc_server_call", "method", "Status.Ping", 3) }) } diff --git a/agent/rpc/middleware/interceptors.go b/agent/rpc/middleware/interceptors.go index a5ee26f4e7..c7ac72f353 100644 --- a/agent/rpc/middleware/interceptors.go +++ b/agent/rpc/middleware/interceptors.go @@ -3,6 +3,7 @@ package middleware import ( "reflect" "strconv" + "strings" "time" "github.com/armon/go-metrics" @@ -22,27 +23,26 @@ const RPCTypeInternal = "internal" const RPCTypeNetRPC = "net/rpc" var metricRPCRequest = []string{"rpc", "server", "call"} -var requestLogName = "rpc.server.request" +var requestLogName = strings.Join(metricRPCRequest, "_") -var NewRPCGauges = []prometheus.GaugeDefinition{ +var OneTwelveRPCSummary = []prometheus.SummaryDefinition{ { Name: metricRPCRequest, - Help: "Increments when a server makes an RPC service call. The labels on the metric have more information", + Help: "Measures the time an RPC service call takes to make in milliseconds. Labels mark which RPC method was called and metadata about the call.", }, } type RequestRecorder struct { Logger hclog.Logger - recorderFunc func(key []string, start time.Time, labels []metrics.Label) + recorderFunc func(key []string, val float32, labels []metrics.Label) } func NewRequestRecorder(logger hclog.Logger) *RequestRecorder { - return &RequestRecorder{Logger: logger, recorderFunc: metrics.MeasureSinceWithLabels} + return &RequestRecorder{Logger: logger, recorderFunc: metrics.AddSampleWithLabels} } func (r *RequestRecorder) Record(requestName string, rpcType string, start time.Time, request interface{}, respErrored bool) { - elapsed := time.Since(start) - + elapsed := time.Since(start).Milliseconds() reqType := requestType(request) labels := []metrics.Label{ @@ -52,9 +52,8 @@ func (r *RequestRecorder) Record(requestName string, rpcType string, start time. {Name: "rpc_type", Value: rpcType}, } - // TODO(FFMMM): it'd be neat if we could actually pass the elapsed observed above - r.recorderFunc(metricRPCRequest, start, labels) - + // math.MaxInt64 < math.MaxFloat32 is true so we should be good! + r.recorderFunc(metricRPCRequest, float32(elapsed), labels) r.Logger.Debug(requestLogName, "method", requestName, "errored", respErrored, @@ -64,10 +63,18 @@ func (r *RequestRecorder) Record(requestName string, rpcType string, start time. } func requestType(req interface{}) string { - if r, ok := req.(interface{ IsRead() bool }); ok && r.IsRead() { - return "read" + if r, ok := req.(interface{ IsRead() bool }); ok { + if r.IsRead() { + return "read" + } else { + return "write" + } } - return "write" + + // This logical branch should not happen. If it happens + // it means an underlying request is not implementing the interface. + // Rather than swallowing it up in a "read" or "write", let's be aware of it. + return "unreported" } func GetNetRPCInterceptor(recorder *RequestRecorder) rpc.ServerServiceCallInterceptor { diff --git a/agent/rpc/middleware/interceptors_test.go b/agent/rpc/middleware/interceptors_test.go index e6743a4a62..23d7649622 100644 --- a/agent/rpc/middleware/interceptors_test.go +++ b/agent/rpc/middleware/interceptors_test.go @@ -13,9 +13,9 @@ import ( // obs holds all the things we want to assert on that we recorded correctly in our tests. type obs struct { - key []string - start time.Time - labels []metrics.Label + key []string + elapsed float32 + labels []metrics.Label } // recorderStore acts as an in-mem mock storage for all the RequestRecorder.Record() recorderFunc calls. @@ -41,9 +41,8 @@ func (rs *recorderStore) get(key []string) obs { } var store = recorderStore{store: make(map[string]obs)} -var simpleRecorderFunc = func(key []string, start time.Time, labels []metrics.Label) { - o := obs{key: key, start: start, labels: labels} - +var simpleRecorderFunc = func(key []string, val float32, labels []metrics.Label) { + o := obs{key: key, elapsed: val, labels: labels} store.put(key, o) } @@ -71,13 +70,13 @@ func TestRequestRecorder_SimpleOK(t *testing.T) { expectedLabels := []metrics.Label{ {Name: "method", Value: "A.B"}, {Name: "errored", Value: "false"}, - {Name: "request_type", Value: "write"}, + {Name: "request_type", Value: "unreported"}, {Name: "rpc_type", Value: RPCTypeInternal}, } o := store.get(append(metricRPCRequest, expectedLabels[0].Value)) require.Equal(t, o.key, metricRPCRequest) - require.Equal(t, o.start, start) + require.LessOrEqual(t, o.elapsed, float32(start.Sub(time.Now()).Milliseconds())) require.Equal(t, o.labels, expectedLabels) } diff --git a/agent/setup.go b/agent/setup.go index 4921a42d89..bf67c0360f 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -25,7 +25,6 @@ import ( "github.com/hashicorp/consul/agent/local" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" - "github.com/hashicorp/consul/agent/rpc/middleware" "github.com/hashicorp/consul/agent/submatview" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/agent/xds" @@ -215,7 +214,6 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau CertExpirationGauges, Gauges, raftGauges, - middleware.NewRPCGauges, } // TODO(ffmmm): conditionally add only leader specific metrics to gauges, counters, summaries, etc