diff --git a/agent/consul/acl_replication_types.go b/agent/consul/acl_replication_types.go index 6516a5c719..0d330ff66b 100644 --- a/agent/consul/acl_replication_types.go +++ b/agent/consul/acl_replication_types.go @@ -86,7 +86,7 @@ func (r *aclTokenReplicator) DeleteLocalBatch(srv *Server, batch []string) error TokenIDs: batch, } - _, err := srv.raftApply(structs.ACLTokenDeleteRequestType, &req) + _, err := srv.leaderRaftApply("ACL.TokenDelete", structs.ACLTokenDeleteRequestType, &req) return err } @@ -110,7 +110,7 @@ func (r *aclTokenReplicator) UpdateLocalBatch(ctx context.Context, srv *Server, FromReplication: true, } - _, err := srv.raftApply(structs.ACLTokenSetRequestType, &req) + _, err := srv.leaderRaftApply("ACL.TokenSet", structs.ACLTokenSetRequestType, &req) return err } @@ -186,7 +186,7 @@ func (r *aclPolicyReplicator) DeleteLocalBatch(srv *Server, batch []string) erro PolicyIDs: batch, } - _, err := srv.raftApply(structs.ACLPolicyDeleteRequestType, &req) + _, err := srv.leaderRaftApply("ACL.PolicyDelete", structs.ACLPolicyDeleteRequestType, &req) return err } @@ -207,7 +207,7 @@ func (r *aclPolicyReplicator) UpdateLocalBatch(ctx context.Context, srv *Server, Policies: r.updated[start:end], } - _, err := srv.raftApply(structs.ACLPolicySetRequestType, &req) + _, err := srv.leaderRaftApply("ACL.PolicySet", structs.ACLPolicySetRequestType, &req) return err } @@ -307,7 +307,7 @@ func (r *aclRoleReplicator) DeleteLocalBatch(srv *Server, batch []string) error RoleIDs: batch, } - _, err := srv.raftApply(structs.ACLRoleDeleteRequestType, &req) + _, err := srv.leaderRaftApply("ACL.RoleDelete", structs.ACLRoleDeleteRequestType, &req) return err } @@ -329,6 +329,7 @@ func (r *aclRoleReplicator) UpdateLocalBatch(ctx context.Context, srv *Server, s AllowMissingLinks: true, } - _, err := srv.raftApply(structs.ACLRoleSetRequestType, &req) + _, err := srv.leaderRaftApply("ACL.RoleSet", structs.ACLRoleSetRequestType, &req) + return err } diff --git a/agent/consul/acl_token_exp.go b/agent/consul/acl_token_exp.go index 370d140ef7..fac2017e7e 100644 --- a/agent/consul/acl_token_exp.go +++ b/agent/consul/acl_token_exp.go @@ -100,7 +100,8 @@ func (s *Server) reapExpiredACLTokens(local, global bool) (int, error) { "amount", len(req.TokenIDs), "locality", locality, ) - _, err = s.raftApply(structs.ACLTokenDeleteRequestType, &req) + + _, err = s.leaderRaftApply("ACL.TokenDelete", structs.ACLTokenDeleteRequestType, &req) if err != nil { return 0, fmt.Errorf("Failed to apply token expiration deletions: %v", err) } diff --git a/agent/consul/config_replication.go b/agent/consul/config_replication.go index ac43efa966..8b1a2273a8 100644 --- a/agent/consul/config_replication.go +++ b/agent/consul/config_replication.go @@ -92,6 +92,11 @@ func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.Con ticker := time.NewTicker(time.Second / time.Duration(s.config.ConfigReplicationApplyLimit)) defer ticker.Stop() + rpcServiceMethod := "ConfigEntry.Apply" + if op == structs.ConfigEntryDelete || op == structs.ConfigEntryDeleteCAS { + rpcServiceMethod = "ConfigEntry.Delete" + } + var merr error for i, entry := range configs { // Exported services only apply to the primary datacenter. @@ -104,7 +109,7 @@ func (s *Server) reconcileLocalConfig(ctx context.Context, configs []structs.Con Entry: entry, } - _, err := s.raftApply(structs.ConfigEntryRequestType, &req) + _, err := s.leaderRaftApply(rpcServiceMethod, structs.ConfigEntryRequestType, &req) if err != nil { merr = multierror.Append(merr, fmt.Errorf("Failed to apply config entry %s: %w", op, err)) } diff --git a/agent/consul/federation_state_replication.go b/agent/consul/federation_state_replication.go index 5379a1e284..bfb433085a 100644 --- a/agent/consul/federation_state_replication.go +++ b/agent/consul/federation_state_replication.go @@ -154,7 +154,7 @@ func (r *FederationStateReplicator) PerformDeletions(ctx context.Context, deleti State: state, } - _, err := r.srv.raftApply(structs.FederationStateRequestType, &req) + _, err := r.srv.leaderRaftApply("FederationState.Delete", structs.FederationStateRequestType, &req) if err != nil { return false, err } @@ -195,7 +195,7 @@ func (r *FederationStateReplicator) PerformUpdates(ctx context.Context, updatesR State: state2, } - _, err := r.srv.raftApply(structs.FederationStateRequestType, &req) + _, err := r.srv.leaderRaftApply("FederationState.Apply", structs.FederationStateRequestType, &req) if err != nil { return false, err } diff --git a/agent/consul/leader.go b/agent/consul/leader.go index 8f612ed54c..b6291ee565 100644 --- a/agent/consul/leader.go +++ b/agent/consul/leader.go @@ -796,7 +796,7 @@ func (s *Server) getOrCreateAutopilotConfig() *structs.AutopilotConfig { config = s.config.AutopilotConfig req := structs.AutopilotSetConfigRequest{Config: *config} - if _, err = s.raftApply(structs.AutopilotRequestType, req); err != nil { + if _, err = s.leaderRaftApply("AutopilotRequest.Apply", structs.AutopilotRequestType, req); err != nil { logger.Error("failed to initialize config", "error", err) return nil } @@ -871,7 +871,7 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error { Entry: entry, } - _, err := s.raftApply(structs.ConfigEntryRequestType, &req) + _, err := s.leaderRaftApply("ConfigEntry.Apply", structs.ConfigEntryRequestType, &req) if err != nil { return fmt.Errorf("Failed to apply configuration entry %q / %q: %v", entry.GetKind(), entry.GetName(), err) } diff --git a/agent/consul/leader_federation_state_ae.go b/agent/consul/leader_federation_state_ae.go index c4d8dc126e..ef6f6378f6 100644 --- a/agent/consul/leader_federation_state_ae.go +++ b/agent/consul/leader_federation_state_ae.go @@ -118,7 +118,7 @@ func (s *Server) updateOurFederationState(curr *structs.FederationState) error { if s.config.Datacenter == s.config.PrimaryDatacenter { // We are the primary, so we can't do an RPC as we don't have a replication token. - _, err := s.raftApply(structs.FederationStateRequestType, args) + _, err := s.leaderRaftApply("FederationState.Apply", structs.FederationStateRequestType, args) if err != nil { return err } @@ -223,7 +223,8 @@ func (s *Server) pruneStaleFederationStates() error { Datacenter: dc, }, } - _, err := s.raftApply(structs.FederationStateRequestType, &req) + _, err := s.leaderRaftApply("FederationState.Delete", structs.FederationStateRequestType, &req) + if err != nil { return fmt.Errorf("Failed to delete federation state %s: %v", dc, err) } diff --git a/agent/consul/leader_intentions.go b/agent/consul/leader_intentions.go index b4afc9c594..9adc26795d 100644 --- a/agent/consul/leader_intentions.go +++ b/agent/consul/leader_intentions.go @@ -170,7 +170,8 @@ func (s *Server) legacyIntentionsMigrationCleanupPhase(quiet bool) error { req := structs.IntentionRequest{ Op: structs.IntentionOpDeleteAll, } - if _, err := s.raftApply(structs.IntentionRequestType, req); err != nil { + + if _, err := s.leaderRaftApply("Intentions.DeleteAll", structs.IntentionRequestType, req); err != nil { return err } @@ -410,7 +411,9 @@ func (s *Server) replicateLegacyIntentionsOnce(ctx context.Context, lastFetchInd for _, ops := range txnOpSets { txnReq := structs.TxnRequest{Ops: ops} - resp, err := s.raftApply(structs.TxnRequestType, &txnReq) + // TODO(rpc-metrics-improv) -- verify labels + resp, err := s.leaderRaftApply("Txn.Apply", structs.TxnRequestType, &txnReq) + if err != nil { return 0, false, err } diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 581a75ac0f..d6fa9fae49 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -30,6 +30,7 @@ import ( "github.com/hashicorp/consul/agent/consul/wanfed" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" + "github.com/hashicorp/consul/agent/rpc/middleware" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logging" @@ -842,6 +843,17 @@ func (s *Server) keyringRPCs(method string, args interface{}, dcs []string) (*st type raftEncoder func(structs.MessageType, interface{}) ([]byte, error) +// leaderRaftApply is used by the leader to persist data to Raft for internal cluster management activities. +// This method MUST not be called from RPC endpoints, since it would result in duplicated RPC metrics. +func (s *Server) leaderRaftApply(method string, t structs.MessageType, msg interface{}) (interface{}, error) { + start := time.Now() + + resp, err := s.raftApplyMsgpack(t, msg) + s.rpcRecorder.Record(method, middleware.RPCTypeInternal, start, &msg, err != nil) + + return resp, err +} + // raftApplyMsgpack encodes the msg using msgpack and calls raft.Apply. See // raftApplyWithEncoder. // Deprecated: use raftApplyMsgpack diff --git a/agent/consul/server.go b/agent/consul/server.go index b9ad053ee4..2c48f55576 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -16,6 +16,8 @@ import ( "sync/atomic" "time" + "github.com/hashicorp/consul/agent/rpc/middleware" + "github.com/hashicorp/go-version" "go.etcd.io/bbolt" @@ -255,6 +257,9 @@ type Server struct { // Endpoint that is available at the time of writing. insecureRPCServer *rpc.Server + // rpcRecorder is a middleware component that can emit RPC request metrics. + rpcRecorder *middleware.RequestRecorder + // tlsConfigurator holds the agent configuration relevant to TLS and // configures everything related to it. tlsConfigurator *tlsutil.Configurator @@ -363,21 +368,24 @@ func NewServer(config *Config, flat Deps) (*Server, error) { serverLogger := flat.Logger.NamedIntercept(logging.ConsulServer) loggers := newLoggerStore(serverLogger) + recorder := middleware.NewRequestRecorder(serverLogger) // Create server. s := &Server{ - config: config, - tokens: flat.Tokens, - connPool: flat.ConnPool, - grpcConnPool: flat.GRPCConnPool, - eventChLAN: make(chan serf.Event, serfEventChSize), - eventChWAN: make(chan serf.Event, serfEventChSize), - logger: serverLogger, - loggers: loggers, - leaveCh: make(chan struct{}), - reconcileCh: make(chan serf.Member, reconcileChSize), - router: flat.Router, - rpcServer: rpc.NewServer(), - insecureRPCServer: rpc.NewServer(), + config: config, + tokens: flat.Tokens, + connPool: flat.ConnPool, + grpcConnPool: flat.GRPCConnPool, + eventChLAN: make(chan serf.Event, serfEventChSize), + eventChWAN: make(chan serf.Event, serfEventChSize), + logger: serverLogger, + loggers: loggers, + leaveCh: make(chan struct{}), + reconcileCh: make(chan serf.Member, reconcileChSize), + router: flat.Router, + rpcRecorder: recorder, + // TODO(rpc-metrics-improv): consider pulling out the interceptor from config in order to isolate testing + rpcServer: rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(middleware.GetNetRPCInterceptor(recorder))), + insecureRPCServer: rpc.NewServerWithOpts(rpc.WithServerServiceCallInterceptor(middleware.GetNetRPCInterceptor(recorder))), tlsConfigurator: flat.TLSConfigurator, reassertLeaderCh: make(chan chan error), sessionTimers: NewSessionTimers(), diff --git a/agent/consul/session_ttl.go b/agent/consul/session_ttl.go index 0bb1cb3f1e..7052d5db1b 100644 --- a/agent/consul/session_ttl.go +++ b/agent/consul/session_ttl.go @@ -115,7 +115,8 @@ func (s *Server) invalidateSession(id string, entMeta *structs.EnterpriseMeta) { // Retry with exponential backoff to invalidate the session for attempt := uint(0); attempt < maxInvalidateAttempts; attempt++ { - _, err := s.raftApply(structs.SessionRequestType, args) + // TODO(rpc-metrics-improv): Double check request name here + _, err := s.leaderRaftApply("Session.Check", structs.SessionRequestType, args) if err == nil { s.logger.Debug("Session TTL expired", "session", id) return diff --git a/agent/consul/system_metadata.go b/agent/consul/system_metadata.go index 87832b81c1..f160322573 100644 --- a/agent/consul/system_metadata.go +++ b/agent/consul/system_metadata.go @@ -22,7 +22,9 @@ func (s *Server) setSystemMetadataKey(key, val string) error { Entry: &structs.SystemMetadataEntry{Key: key, Value: val}, } - _, err := s.raftApply(structs.SystemMetadataRequestType, args) + // TODO(rpc-metrics-improv): Double check request name here + _, err := s.leaderRaftApply("SystemMetadata.Upsert", structs.SystemMetadataRequestType, args) + return err } @@ -32,6 +34,8 @@ func (s *Server) deleteSystemMetadataKey(key string) error { Entry: &structs.SystemMetadataEntry{Key: key}, } - _, err := s.raftApply(structs.SystemMetadataRequestType, args) + // TODO(rpc-metrics-improv): Double check request name here + _, err := s.leaderRaftApply("SystemMetadata.Delete", structs.SystemMetadataRequestType, args) + return err } diff --git a/agent/metrics_test.go b/agent/metrics_test.go index bbfe2430bf..f544f79e2c 100644 --- a/agent/metrics_test.go +++ b/agent/metrics_test.go @@ -33,6 +33,16 @@ func recordPromMetrics(t *testing.T, a *TestAgent, respRec *httptest.ResponseRec } +func assertMetricExists(t *testing.T, respRec *httptest.ResponseRecorder, metric string) { + if respRec.Body.String() == "" { + t.Fatalf("Response body is empty.") + } + + if !strings.Contains(respRec.Body.String(), metric) { + t.Fatalf("Could not find the metric \"%s\" in the /v1/agent/metrics response", metric) + } +} + func assertMetricExistsWithValue(t *testing.T, respRec *httptest.ResponseRecorder, metric string, value string) { if respRec.Body.String() == "" { t.Fatalf("Response body is empty.") @@ -56,6 +66,36 @@ func assertMetricNotExists(t *testing.T, respRec *httptest.ResponseRecorder, met } } +// TestAgent_NewRPCMetrics test for the new RPC metrics presence. These are the labeled metrics coming from +// agent.rpc.middleware.interceptors package. +func TestAgent_NewRPCMetrics(t *testing.T) { + skipIfShortTesting(t) + // This test cannot use t.Parallel() since we modify global state, ie the global metrics instance + + t.Run("Check new rpc metrics are being emitted", func(t *testing.T) { + metricsPrefix := "new_rpc_metrics" + hcl := fmt.Sprintf(` + telemetry = { + prometheus_retention_time = "5s" + disable_hostname = true + metrics_prefix = "%s" + } + `, metricsPrefix) + + a := StartTestAgent(t, TestAgent{HCL: hcl}) + defer a.Shutdown() + + var out struct{} + err := a.RPC("Status.Ping", struct{}{}, &out) + require.NoError(t, err) + + respRec := httptest.NewRecorder() + recordPromMetrics(t, a, respRec) + + assertMetricExists(t, respRec, metricsPrefix+"_rpc_server_request") + }) +} + // TestHTTPHandlers_AgentMetrics_ConsulAutopilot_Prometheus adds testing around // the published autopilot metrics on https://www.consul.io/docs/agent/telemetry#autopilot func TestHTTPHandlers_AgentMetrics_ConsulAutopilot_Prometheus(t *testing.T) { diff --git a/agent/rpc/middleware/interceptors.go b/agent/rpc/middleware/interceptors.go new file mode 100644 index 0000000000..0ae7159670 --- /dev/null +++ b/agent/rpc/middleware/interceptors.go @@ -0,0 +1,77 @@ +package middleware + +import ( + "reflect" + "strconv" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul-net-rpc/net/rpc" + "github.com/hashicorp/go-hclog" +) + +// RPCTypeInternal identifies the "RPC" request as coming from some internal +// operation that runs on the cluster leader. Technically this is not an RPC +// request, but these raft.Apply operations have the same impact on blocking +// queries, and streaming subscriptions, so need to be tracked by the same metric +// and logs. +// Really what we are measuring here is a "cluster operation". The term we have +// used for this historically is "RPC", so we continue to use that here. +const RPCTypeInternal = "internal" + +var metricRPCRequest = []string{"rpc", "server", "request"} +var requestLogName = "rpc.server.request" + +type RequestRecorder struct { + Logger hclog.Logger +} + +func NewRequestRecorder(logger hclog.Logger) *RequestRecorder { + return &RequestRecorder{Logger: logger} +} + +func (r *RequestRecorder) Record(requestName string, rpcType string, start time.Time, request interface{}, respErrored bool) { + elapsed := time.Since(start) + + reqType := requestType(request) + + labels := []metrics.Label{ + {Name: "method", Value: requestName}, + {Name: "errored", Value: strconv.FormatBool(respErrored)}, + {Name: "request_type", Value: reqType}, + {Name: "rpc_type", Value: rpcType}, + } + + // TODO(rpc-metrics-improv): consider using Telemetry API call here + // It'd be neat if we could actually pass the elapsed observed above + metrics.MeasureSinceWithLabels(metricRPCRequest, start, labels) + + r.Logger.Debug(requestLogName, + "method", requestName, + "errored", respErrored, + "request_type", reqType, + "rpc_type", rpcType, + "elapsed", elapsed) +} + +func requestType(req interface{}) string { + if r, ok := req.(interface{ IsRead() bool }); ok && r.IsRead() { + return "read" + } + return "write" +} + +func GetNetRPCInterceptor(recorder *RequestRecorder) rpc.ServerServiceCallInterceptor { + return func(reqServiceMethod string, argv, replyv reflect.Value, handler func() error) { + reqStart := time.Now() + + err := handler() + + responseErr := false + if err != nil { + responseErr = true + } + + recorder.Record(reqServiceMethod, "net/rpc", reqStart, argv.Interface(), responseErr) + } +} diff --git a/go.mod b/go.mod index cf8a79ff5f..17fe9d28da 100644 --- a/go.mod +++ b/go.mod @@ -31,7 +31,7 @@ require ( github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22 github.com/google/tcpproxy v0.0.0-20180808230851-dfa16c61dad2 github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 - github.com/hashicorp/consul-net-rpc v0.0.0-20220207223504-4cffceffcd29 + github.com/hashicorp/consul-net-rpc v0.0.0-20220307172752-3602954411b4 github.com/hashicorp/consul/api v1.11.0 github.com/hashicorp/consul/sdk v0.8.0 github.com/hashicorp/go-bexpr v0.1.2 diff --git a/go.sum b/go.sum index a027680b2d..0856849753 100644 --- a/go.sum +++ b/go.sum @@ -222,8 +222,8 @@ github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmo github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= -github.com/hashicorp/consul-net-rpc v0.0.0-20220207223504-4cffceffcd29 h1:0BbXmAgzy5vx2rjixiO1FLJBdYJfEvSixcjWOli2w+Q= -github.com/hashicorp/consul-net-rpc v0.0.0-20220207223504-4cffceffcd29/go.mod h1:vWEAHAeAqfOwB3pSgHMQpIu8VH1jL+Ltg54Tw0wt/NI= +github.com/hashicorp/consul-net-rpc v0.0.0-20220307172752-3602954411b4 h1:Com/5n/omNSBusX11zdyIYtidiqewLIanchbm//McZA= +github.com/hashicorp/consul-net-rpc v0.0.0-20220307172752-3602954411b4/go.mod h1:vWEAHAeAqfOwB3pSgHMQpIu8VH1jL+Ltg54Tw0wt/NI= github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-bexpr v0.1.2 h1:ijMXI4qERbzxbCnkxmfUtwMyjrrk3y+Vt0MxojNCbBs=