From e530fbfb338fdd7379fcf0e4b2ebfba66e984e7f Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Tue, 22 Mar 2022 16:58:41 -0700 Subject: [PATCH] oss: Add overview UI internal endpoint --- agent/consul/internal_endpoint.go | 22 +++ agent/consul/internal_endpoint_test.go | 91 +++++++++++++ agent/consul/server.go | 7 + agent/consul/server_overview.go | 182 +++++++++++++++++++++++++ agent/consul/server_overview_test.go | 166 ++++++++++++++++++++++ agent/consul/state/catalog.go | 36 +++++ agent/http_register.go | 1 + agent/structs/catalog.go | 36 +++++ agent/structs/structs_oss.go | 4 + agent/ui_endpoint.go | 18 +++ 10 files changed, 563 insertions(+) create mode 100644 agent/consul/server_overview.go create mode 100644 agent/consul/server_overview_test.go diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 62fb667b1c..9c2f2c75db 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -147,6 +147,28 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs. }) } +func (m *Internal) CatalogOverview(args *structs.DCSpecificRequest, reply *structs.CatalogSummary) error { + if done, err := m.srv.ForwardRPC("Internal.CatalogOverview", args, reply); done { + return err + } + + authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil) + if err != nil { + return err + } + + if authz.OperatorRead(nil) != acl.Allow { + return acl.PermissionDeniedByACLUnnamed(authz, nil, acl.ResourceOperator, acl.AccessRead) + } + + summary := m.srv.overviewManager.GetCurrentSummary() + if summary != nil { + *reply = *summary + } + + return nil +} + func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceTopology) error { if done, err := m.srv.ForwardRPC("Internal.ServiceTopology", args, reply); done { return err diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index 601eb7cc49..25c9c75f40 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -6,6 +6,7 @@ import ( "os" "strings" "testing" + "time" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/stretchr/testify/assert" @@ -2477,3 +2478,93 @@ service_prefix "mongo" { policy = "read" } }) }) } + +func TestInternal_CatalogOverview(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.MetricsReportingInterval = 100 * time.Millisecond + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + retry.Run(t, func(r *retry.R) { + var out structs.CatalogSummary + if err := msgpackrpc.CallWithCodec(codec, "Internal.CatalogOverview", &arg, &out); err != nil { + r.Fatalf("err: %v", err) + } + + expected := structs.CatalogSummary{ + Nodes: []structs.HealthSummary{ + { + Total: 1, + Passing: 1, + EnterpriseMeta: *structs.NodeEnterpriseMetaInDefaultPartition(), + }, + }, + Services: []structs.HealthSummary{ + { + Name: "consul", + Total: 1, + Passing: 1, + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + }, + Checks: []structs.HealthSummary{ + { + Name: "Serf Health Status", + Total: 1, + Passing: 1, + EnterpriseMeta: *structs.DefaultEnterpriseMetaInDefaultPartition(), + }, + }, + } + require.Equal(r, expected, out) + }) +} + +func TestInternal_CatalogOverview_ACLDeny(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.PrimaryDatacenter = "dc1" + c.ACLsEnabled = true + c.ACLInitialManagementToken = TestDefaultInitialManagementToken + c.ACLResolverSettings.ACLDefaultPolicy = "deny" + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + codec := rpcClient(t, s1) + defer codec.Close() + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + var out structs.CatalogSummary + err := msgpackrpc.CallWithCodec(codec, "Internal.CatalogOverview", &arg, &out) + require.True(t, acl.IsErrPermissionDenied(err)) + + opReadToken, err := upsertTestTokenWithPolicyRules( + codec, TestDefaultInitialManagementToken, "dc1", `operator = "read"`) + require.NoError(t, err) + + arg.Token = opReadToken.SecretID + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.CatalogOverview", &arg, &out)) +} diff --git a/agent/consul/server.go b/agent/consul/server.go index ba77c45171..da821ebc8b 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -308,6 +308,10 @@ type Server struct { // Consul router. statsFetcher *StatsFetcher + // overviewManager is used to periodically update the cluster overview + // and emit node/service/check health metrics. + overviewManager *OverviewManager + // reassertLeaderCh is used to signal the leader loop should re-run // leadership actions after a snapshot restore. reassertLeaderCh chan chan error @@ -613,6 +617,9 @@ func NewServer(config *Config, flat Deps, publicGRPCServer *grpc.Server) (*Serve } go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) + s.overviewManager = NewOverviewManager(s.logger, s.fsm, s.config.MetricsReportingInterval) + go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) + s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s) s.grpcLeaderForwarder = flat.LeaderForwarder go s.trackLeaderChanges() diff --git a/agent/consul/server_overview.go b/agent/consul/server_overview.go new file mode 100644 index 0000000000..149743d3f2 --- /dev/null +++ b/agent/consul/server_overview.go @@ -0,0 +1,182 @@ +package consul + +import ( + "context" + "fmt" + "sort" + "sync" + "time" + + "github.com/hashicorp/consul/agent/consul/usagemetrics" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/hashicorp/go-hclog" +) + +type OverviewManager struct { + stateProvider usagemetrics.StateProvider + logger hclog.Logger + interval time.Duration + + currentSummary *structs.CatalogSummary + sync.RWMutex +} + +func NewOverviewManager(logger hclog.Logger, sp usagemetrics.StateProvider, interval time.Duration) *OverviewManager { + return &OverviewManager{ + stateProvider: sp, + logger: logger.Named("catalog-overview"), + interval: interval, + currentSummary: &structs.CatalogSummary{}, + } +} + +func (m *OverviewManager) GetCurrentSummary() *structs.CatalogSummary { + m.RLock() + defer m.RUnlock() + return m.currentSummary +} + +func (m *OverviewManager) Run(ctx context.Context) { + ticker := time.NewTicker(m.interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + state := m.stateProvider.State() + catalog, err := state.CatalogDump() + if err != nil { + m.logger.Error("failed to update overview", "error", err) + continue + } + + summary := getCatalogOverview(catalog) + m.Lock() + m.currentSummary = summary + m.Unlock() + } + } +} + +// getCatalogOverview returns a breakdown of the number of nodes, services, and checks +// in the passing/warning/critical states. In Enterprise, it will also return this +// breakdown for each partition and namespace. +func getCatalogOverview(catalog *structs.CatalogContents) *structs.CatalogSummary { + nodeChecks := make(map[string][]*structs.HealthCheck) + serviceInstanceChecks := make(map[string][]*structs.HealthCheck) + checkSummaries := make(map[string]structs.HealthSummary) + + // Compute the health check summaries by taking the pass/warn/fail counts + // of each unique part/ns/checkname combo and storing them. Also store the + // per-node and per-service instance checks for their respective summaries below. + for _, check := range catalog.Checks { + checkID := fmt.Sprintf("%s/%s", check.EnterpriseMeta.String(), check.Name) + summary, ok := checkSummaries[checkID] + if !ok { + summary = structs.HealthSummary{ + Name: check.Name, + EnterpriseMeta: check.EnterpriseMeta, + } + } + + summary.Add(check.Status) + checkSummaries[checkID] = summary + + if check.ServiceID != "" { + serviceInstanceID := fmt.Sprintf("%s/%s/%s", check.EnterpriseMeta.String(), check.Node, check.ServiceID) + serviceInstanceChecks[serviceInstanceID] = append(serviceInstanceChecks[serviceInstanceID], check) + } else { + nodeMeta := check.NodeIdentity().EnterpriseMeta + nodeID := fmt.Sprintf("%s/%s", nodeMeta.String(), check.Node) + nodeChecks[nodeID] = append(nodeChecks[nodeID], check) + } + } + + // Compute the service instance summaries by taking the unhealthiest check for + // a given service instance as its health status and totaling the counts for each + // partition/ns/service combination. + serviceSummaries := make(map[string]structs.HealthSummary) + for _, svc := range catalog.Services { + sid := structs.NewServiceID(svc.ServiceName, &svc.EnterpriseMeta) + summary, ok := serviceSummaries[sid.String()] + if !ok { + summary = structs.HealthSummary{ + Name: svc.ServiceName, + EnterpriseMeta: svc.EnterpriseMeta, + } + } + + // Compute whether this service instance is healthy based on its associated checks. + serviceInstanceID := fmt.Sprintf("%s/%s/%s", svc.EnterpriseMeta.String(), svc.Node, svc.ServiceID) + status := api.HealthPassing + for _, checks := range serviceInstanceChecks[serviceInstanceID] { + if checks.Status == api.HealthWarning && status == api.HealthPassing { + status = api.HealthWarning + } + if checks.Status == api.HealthCritical { + status = api.HealthCritical + } + } + + summary.Add(status) + serviceSummaries[sid.String()] = summary + } + + // Compute the node summaries by taking the unhealthiest check for each node + // as its health status and totaling the passing/warning/critical counts for + // each partition. + nodeSummaries := make(map[string]structs.HealthSummary) + for _, node := range catalog.Nodes { + nodeMeta := structs.NodeEnterpriseMetaInPartition(node.Partition) + summary, ok := nodeSummaries[nodeMeta.String()] + if !ok { + summary = structs.HealthSummary{ + EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(node.Partition), + } + } + + // Compute whether this node is healthy based on its associated checks. + status := api.HealthPassing + nodeID := fmt.Sprintf("%s/%s", nodeMeta.String(), node.Node) + for _, checks := range nodeChecks[nodeID] { + if checks.Status == api.HealthWarning && status == api.HealthPassing { + status = api.HealthWarning + } + if checks.Status == api.HealthCritical { + status = api.HealthCritical + } + } + + summary.Add(status) + nodeSummaries[nodeMeta.String()] = summary + } + + // Construct the summary. + summary := &structs.CatalogSummary{} + for _, healthSummary := range nodeSummaries { + summary.Nodes = append(summary.Nodes, healthSummary) + } + for _, healthSummary := range serviceSummaries { + summary.Services = append(summary.Services, healthSummary) + } + for _, healthSummary := range checkSummaries { + summary.Checks = append(summary.Checks, healthSummary) + } + + summarySort := func(slice []structs.HealthSummary) func(int, int) bool { + return func(i, j int) bool { + if slice[i].Name < slice[j].Name { + return true + } + return slice[i].EnterpriseMeta.String() < slice[j].EnterpriseMeta.String() + } + } + sort.Slice(summary.Nodes, summarySort(summary.Nodes)) + sort.Slice(summary.Services, summarySort(summary.Services)) + sort.Slice(summary.Checks, summarySort(summary.Checks)) + + return summary +} diff --git a/agent/consul/server_overview_test.go b/agent/consul/server_overview_test.go new file mode 100644 index 0000000000..dc2d439e06 --- /dev/null +++ b/agent/consul/server_overview_test.go @@ -0,0 +1,166 @@ +package consul + +import ( + "testing" + + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" + "github.com/stretchr/testify/require" +) + +func TestCatalogOverview(t *testing.T) { + cases := []struct { + name string + nodes []*structs.Node + services []*structs.ServiceNode + checks []*structs.HealthCheck + expected structs.CatalogSummary + }{ + { + name: "empty", + expected: structs.CatalogSummary{}, + }, + { + name: "one node with no checks", + nodes: []*structs.Node{ + {Node: "node1"}, + }, + expected: structs.CatalogSummary{ + Nodes: []structs.HealthSummary{ + {Total: 1, Passing: 1}, + }, + }, + }, + { + name: "one service with no checks", + services: []*structs.ServiceNode{ + {Node: "node1", ServiceName: "service1"}, + }, + expected: structs.CatalogSummary{ + Services: []structs.HealthSummary{ + {Name: "service1", Total: 1, Passing: 1}, + }, + }, + }, + { + name: "three nodes with node checks", + nodes: []*structs.Node{ + {Node: "node1"}, + {Node: "node2"}, + {Node: "node3"}, + }, + checks: []*structs.HealthCheck{ + {Node: "node1", Name: "check1", CheckID: "check1", Status: api.HealthPassing}, + {Node: "node2", Name: "check1", CheckID: "check1", Status: api.HealthWarning}, + {Node: "node3", Name: "check1", CheckID: "check1", Status: api.HealthCritical}, + }, + expected: structs.CatalogSummary{ + Nodes: []structs.HealthSummary{ + {Total: 3, Passing: 1, Warning: 1, Critical: 1}, + }, + Checks: []structs.HealthSummary{ + {Name: "check1", Total: 3, Passing: 1, Warning: 1, Critical: 1}, + }, + }, + }, + { + name: "three instances of one service with checks", + nodes: []*structs.Node{ + {Node: "node1"}, + }, + services: []*structs.ServiceNode{ + {Node: "node1", ServiceName: "service1", ServiceID: "id1"}, + {Node: "node1", ServiceName: "service1", ServiceID: "id2"}, + {Node: "node1", ServiceName: "service1", ServiceID: "id3"}, + }, + checks: []*structs.HealthCheck{ + {Node: "node1", Name: "check1", CheckID: "check1", ServiceID: "id1", Status: api.HealthPassing}, + {Node: "node1", Name: "check1", CheckID: "check2", ServiceID: "id2", Status: api.HealthWarning}, + {Node: "node1", Name: "check1", CheckID: "check3", ServiceID: "id3", Status: api.HealthCritical}, + }, + expected: structs.CatalogSummary{ + Nodes: []structs.HealthSummary{ + {Total: 1, Passing: 1}, + }, + Services: []structs.HealthSummary{ + {Name: "service1", Total: 3, Passing: 1, Warning: 1, Critical: 1}, + }, + Checks: []structs.HealthSummary{ + {Name: "check1", Total: 3, Passing: 1, Warning: 1, Critical: 1}, + }, + }, + }, + { + name: "three instances of different services with checks", + nodes: []*structs.Node{ + {Node: "node1"}, + }, + services: []*structs.ServiceNode{ + {Node: "node1", ServiceName: "service1", ServiceID: "id1"}, + {Node: "node1", ServiceName: "service2", ServiceID: "id2"}, + {Node: "node1", ServiceName: "service3", ServiceID: "id3"}, + }, + checks: []*structs.HealthCheck{ + {Node: "node1", Name: "check1", CheckID: "check1", ServiceID: "id1", Status: api.HealthPassing}, + {Node: "node1", Name: "check1", CheckID: "check2", ServiceID: "id2", Status: api.HealthWarning}, + {Node: "node1", Name: "check1", CheckID: "check3", ServiceID: "id3", Status: api.HealthCritical}, + }, + expected: structs.CatalogSummary{ + Nodes: []structs.HealthSummary{ + {Total: 1, Passing: 1}, + }, + Services: []structs.HealthSummary{ + {Name: "service1", Total: 1, Passing: 1}, + {Name: "service2", Total: 1, Warning: 1}, + {Name: "service3", Total: 1, Critical: 1}, + }, + Checks: []structs.HealthSummary{ + {Name: "check1", Total: 3, Passing: 1, Warning: 1, Critical: 1}, + }, + }, + }, + { + name: "many instances of the same check", + checks: []*structs.HealthCheck{ + {Name: "check1", CheckID: "check1", Status: api.HealthPassing}, + {Name: "check1", CheckID: "check2", Status: api.HealthWarning}, + {Name: "check1", CheckID: "check3", Status: api.HealthCritical}, + {Name: "check1", CheckID: "check4", Status: api.HealthPassing}, + {Name: "check1", CheckID: "check5", Status: api.HealthCritical}, + }, + expected: structs.CatalogSummary{ + Checks: []structs.HealthSummary{ + {Name: "check1", Total: 5, Passing: 2, Warning: 1, Critical: 2}, + }, + }, + }, + { + name: "three different checks", + checks: []*structs.HealthCheck{ + {Name: "check1", CheckID: "check1", Status: api.HealthPassing}, + {Name: "check2", CheckID: "check2", Status: api.HealthWarning}, + {Name: "check3", CheckID: "check3", Status: api.HealthCritical}, + }, + expected: structs.CatalogSummary{ + Checks: []structs.HealthSummary{ + {Name: "check1", Total: 1, Passing: 1}, + {Name: "check2", Total: 1, Warning: 1}, + {Name: "check3", Total: 1, Critical: 1}, + }, + }, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + summary := getCatalogOverview(&structs.CatalogContents{ + Nodes: tc.nodes, + Services: tc.services, + Checks: tc.checks, + }) + require.ElementsMatch(t, tc.expected.Nodes, summary.Nodes) + require.ElementsMatch(t, tc.expected.Services, summary.Services) + require.ElementsMatch(t, tc.expected.Checks, summary.Checks) + }) + } +} diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index d8284d4272..b882931d6d 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -4106,3 +4106,39 @@ func cleanupKindServiceName(tx WriteTxn, idx uint64, name structs.ServiceName, k } return nil } + +// CatalogDump returns all the contents of the node, service and check tables. +// In Enterprise, this will return entries across all partitions and namespaces. +func (s *Store) CatalogDump() (*structs.CatalogContents, error) { + tx := s.db.Txn(false) + contents := &structs.CatalogContents{} + + nodes, err := tx.Get(tableNodes, indexID) + if err != nil { + return nil, fmt.Errorf("failed nodes lookup: %s", err) + } + for node := nodes.Next(); node != nil; node = nodes.Next() { + n := node.(*structs.Node) + contents.Nodes = append(contents.Nodes, n) + } + + services, err := tx.Get(tableServices, indexID) + if err != nil { + return nil, fmt.Errorf("failed services lookup: %s", err) + } + for service := services.Next(); service != nil; service = services.Next() { + svc := service.(*structs.ServiceNode) + contents.Services = append(contents.Services, svc) + } + + checks, err := tx.Get(tableChecks, indexID) + if err != nil { + return nil, fmt.Errorf("failed checks lookup: %s", err) + } + for check := checks.Next(); check != nil; check = checks.Next() { + c := check.(*structs.HealthCheck) + contents.Checks = append(contents.Checks, c) + } + + return contents, nil +} diff --git a/agent/http_register.go b/agent/http_register.go index df20cdfe3b..47cdfcf1f9 100644 --- a/agent/http_register.go +++ b/agent/http_register.go @@ -91,6 +91,7 @@ func init() { registerEndpoint("/v1/internal/ui/nodes", []string{"GET"}, (*HTTPHandlers).UINodes) registerEndpoint("/v1/internal/ui/node/", []string{"GET"}, (*HTTPHandlers).UINodeInfo) registerEndpoint("/v1/internal/ui/services", []string{"GET"}, (*HTTPHandlers).UIServices) + registerEndpoint("/v1/internal/ui/catalog-overview", []string{"GET"}, (*HTTPHandlers).UICatalogOverview) registerEndpoint("/v1/internal/ui/gateway-services-nodes/", []string{"GET"}, (*HTTPHandlers).UIGatewayServicesNodes) registerEndpoint("/v1/internal/ui/gateway-intentions/", []string{"GET"}, (*HTTPHandlers).UIGatewayIntentions) registerEndpoint("/v1/internal/ui/service-topology/", []string{"GET"}, (*HTTPHandlers).UIServiceTopology) diff --git a/agent/structs/catalog.go b/agent/structs/catalog.go index b118b99352..73cd0264f4 100644 --- a/agent/structs/catalog.go +++ b/agent/structs/catalog.go @@ -1,6 +1,7 @@ package structs import ( + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/types" ) @@ -19,3 +20,38 @@ const ( ConsulServiceID = "consul" ConsulServiceName = "consul" ) + +type CatalogContents struct { + Nodes []*Node + Services []*ServiceNode + Checks []*HealthCheck +} + +type CatalogSummary struct { + Nodes []HealthSummary + Services []HealthSummary + Checks []HealthSummary +} + +type HealthSummary struct { + Name string `json:",omitempty"` + + Total int + Passing int + Warning int + Critical int + + EnterpriseMeta +} + +func (h *HealthSummary) Add(status string) { + h.Total++ + switch status { + case api.HealthPassing: + h.Passing++ + case api.HealthWarning: + h.Warning++ + case api.HealthCritical: + h.Critical++ + } +} diff --git a/agent/structs/structs_oss.go b/agent/structs/structs_oss.go index 669361802b..7f56c43553 100644 --- a/agent/structs/structs_oss.go +++ b/agent/structs/structs_oss.go @@ -15,6 +15,10 @@ var emptyEnterpriseMeta = EnterpriseMeta{} // EnterpriseMeta stub type EnterpriseMeta struct{} +func (m *EnterpriseMeta) String() string { + return "" +} + func (m *EnterpriseMeta) ToEnterprisePolicyMeta() *acl.EnterprisePolicyMeta { return nil } diff --git a/agent/ui_endpoint.go b/agent/ui_endpoint.go index f794f2f669..1defb241b9 100644 --- a/agent/ui_endpoint.go +++ b/agent/ui_endpoint.go @@ -172,6 +172,24 @@ RPC: return nil, nil } +// UICatalogOverview is used to get a high-level overview of the health of nodes, services, +// and checks in the datacenter. +func (s *HTTPHandlers) UICatalogOverview(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + // Parse arguments + args := structs.DCSpecificRequest{} + if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { + return nil, nil + } + + // Make the RPC request + var out structs.CatalogSummary + if err := s.agent.RPC("Internal.CatalogOverview", &args, &out); err != nil { + return nil, err + } + + return out, nil +} + // UIServices is used to list the services in a given datacenter. We return a // ServiceSummary which provides overview information for the service func (s *HTTPHandlers) UIServices(resp http.ResponseWriter, req *http.Request) (interface{}, error) {