From f906b943516ee43c082013870bc973573619eebb Mon Sep 17 00:00:00 2001 From: freddygv Date: Mon, 28 Sep 2020 19:41:24 -0600 Subject: [PATCH] Add func to combine up+downstream queries --- agent/consul/state/catalog.go | 126 ++++++++++++++++++++++++++--- agent/consul/state/catalog_test.go | 8 +- agent/consul/state/config_entry.go | 5 +- agent/structs/structs.go | 11 +++ 4 files changed, 132 insertions(+), 18 deletions(-) diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 527886d36a..bb7bbd53c9 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -2017,6 +2017,33 @@ func (s *Store) deleteCheckTxn(tx *txn, idx uint64, node string, checkID types.C return nil } +// CombinedCheckServiceNodes is used to query all nodes and checks for both typical and Connect endpoints of a service +func (s *Store) CombinedCheckServiceNodes(ws memdb.WatchSet, service structs.ServiceName) (uint64, structs.CheckServiceNodes, error) { + var ( + resp structs.CheckServiceNodes + maxIdx uint64 + ) + idx, csn, err := s.CheckServiceNodes(ws, service.Name, &service.EnterpriseMeta) + if err != nil { + return 0, nil, fmt.Errorf("failed to get downstream nodes for %q: %v", service, err) + } + if idx > maxIdx { + maxIdx = idx + } + resp = append(resp, csn...) + + idx, csn, err = s.CheckConnectServiceNodes(ws, service.Name, &service.EnterpriseMeta) + if err != nil { + return 0, nil, fmt.Errorf("failed to get downstream connect nodes for %q: %v", service, err) + } + if idx > maxIdx { + maxIdx = idx + } + resp = append(resp, csn...) + + return maxIdx, resp, nil +} + // CheckServiceNodes is used to query all nodes and checks for a given service. func (s *Store) CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *structs.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error) { return s.checkServiceNodes(ws, serviceName, false, entMeta) @@ -2882,17 +2909,93 @@ func checkProtocolMatch(tx ReadTxn, ws memdb.WatchSet, svc *structs.GatewayServi return idx, svc.Protocol == protocol, nil } +func (s *Store) ServiceTopology( + ws memdb.WatchSet, + dc, service string, + entMeta *structs.EnterpriseMeta, +) (uint64, *structs.ServiceTopology, error) { + + var ( + maxIdx uint64 + sn = structs.NewServiceName(service, entMeta) + ) + idx, upstreamNames, err := s.UpstreamsForService(ws, dc, sn) + if err != nil { + return 0, nil, fmt.Errorf("failed to get upstreams for %q: %v", sn.String(), err) + } + if idx > maxIdx { + maxIdx = idx + } + + var upstreams structs.CheckServiceNodes + for _, u := range upstreamNames { + // Collect both typical and connect endpoints, this allows aggregating check statuses across both + idx, csn, err := s.CheckServiceNodes(ws, u.Name, &u.EnterpriseMeta) + if err != nil { + return 0, nil, fmt.Errorf("failed to get upstream nodes for %q: %v", sn.String(), err) + } + if idx > maxIdx { + maxIdx = idx + } + upstreams = append(upstreams, csn...) + + idx, csn, err = s.CheckConnectServiceNodes(ws, u.Name, &u.EnterpriseMeta) + if err != nil { + return 0, nil, fmt.Errorf("failed to get upstream connect nodes for %q: %v", sn.String(), err) + } + if idx > maxIdx { + maxIdx = idx + } + upstreams = append(upstreams, csn...) + } + + idx, downstreamNames, err := s.DownstreamsForService(ws, dc, sn) + if err != nil { + return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", sn.String(), err) + } + if idx > maxIdx { + maxIdx = idx + } + + var downstreams structs.CheckServiceNodes + for _, u := range downstreamNames { + // Collect both typical and connect endpoints, this allows aggregating check statuses across both + idx, csn, err := s.CheckServiceNodes(ws, u.Name, &u.EnterpriseMeta) + if err != nil { + return 0, nil, fmt.Errorf("failed to get downstream nodes for %q: %v", sn.String(), err) + } + if idx > maxIdx { + maxIdx = idx + } + downstreams = append(downstreams, csn...) + + idx, csn, err = s.CheckConnectServiceNodes(ws, u.Name, &u.EnterpriseMeta) + if err != nil { + return 0, nil, fmt.Errorf("failed to get downstream connect nodes for %q: %v", sn.String(), err) + } + if idx > maxIdx { + maxIdx = idx + } + downstreams = append(downstreams, csn...) + } + + resp := &structs.ServiceTopology{ + Upstreams: upstreams, + Downstreams: downstreams, + } + return 0, resp, nil +} + // UpstreamsForService will find all upstream services that the input could route traffic to. // There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams. // TODO (freddy): Account for ingress gateways -func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) { +func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc string, sn structs.ServiceName) (uint64, []structs.ServiceName, error) { tx := s.db.ReadTxn() defer tx.Abort() - sn := structs.NewServiceName(service, entMeta) idx, upstreams, err := upstreamsFromRegistration(ws, tx, sn) if err != nil { - return 0, nil, fmt.Errorf("failed to get upstreams for %q: %v", sn.String(), err) + return 0, nil, fmt.Errorf("failed to get registration upstreams for %q: %v", sn.String(), err) } var maxIdx uint64 @@ -2930,15 +3033,14 @@ func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc, service string, entMe // DownstreamsForService will find all downstream services that could route traffic to the input service. // There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams. // TODO (freddy): Account for ingress gateways -func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) { +func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc string, service structs.ServiceName) (uint64, []structs.ServiceName, error) { tx := s.db.ReadTxn() defer tx.Abort() // First fetch services with discovery chains that list the input as a target - sn := structs.NewServiceName(service, entMeta) - idx, sources, err := s.sourcesForTarget(ws, tx, dc, service, entMeta) + idx, sources, err := s.sourcesForTarget(ws, tx, dc, service) if err != nil { - return 0, nil, fmt.Errorf("failed to get sources for discovery chain target %q: %v", sn.String(), err) + return 0, nil, fmt.Errorf("failed to get sources for discovery chain target %q: %v", service.String(), err) } var maxIdx uint64 @@ -2954,7 +3056,7 @@ func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc, service string, ent // We then follow these discovery chain sources one level down to the services defining them as an upstream. idx, downstreams, err := downstreamsFromRegistration(ws, tx, s) if err != nil { - return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", s.String(), err) + return 0, nil, fmt.Errorf("failed to get registration downstreams for %q: %v", s.String(), err) } if idx > maxIdx { maxIdx = idx @@ -2968,9 +3070,9 @@ func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc, service string, ent } // Also append services that directly listed the input as an upstream - idx, downstreams, err := downstreamsFromRegistration(ws, tx, sn) + idx, downstreams, err := downstreamsFromRegistration(ws, tx, service) if err != nil { - return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", sn.String(), err) + return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", service.String(), err) } if idx > maxIdx { maxIdx = idx @@ -2994,7 +3096,7 @@ func downstreamsFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.Servi return linkedFromRegistration(ws, tx, sn, true) } -func linkedFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) { +func linkedFromRegistration(ws memdb.WatchSet, tx ReadTxn, service structs.ServiceName, downstreams bool) (uint64, []structs.ServiceName, error) { // To fetch upstreams we query services that have the input listed as a downstream // To fetch downstreams we query services that have the input listed as an upstream index := "downstream" @@ -3002,7 +3104,7 @@ func linkedFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceNam index = "upstream" } - iter, err := tx.Get(topologyTableName, index, sn) + iter, err := tx.Get(topologyTableName, index, service) if err != nil { return 0, nil, fmt.Errorf("%q lookup failed: %v", topologyTableName, err) } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index d330a9f1e6..6c73debe76 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -6861,7 +6861,8 @@ func TestCatalog_UpstreamsForService(t *testing.T) { } ws := memdb.NewWatchSet() - idx, names, err := s.UpstreamsForService(ws, "dc1", "api", structs.DefaultEnterpriseMeta()) + sn := structs.NewServiceName("api", structs.DefaultEnterpriseMeta()) + idx, names, err := s.UpstreamsForService(ws, "dc1", sn) require.NoError(t, err) require.Equal(t, tc.expect.idx, idx) @@ -6993,11 +6994,12 @@ func TestCatalog_DownstreamsForService(t *testing.T) { } ws := memdb.NewWatchSet() - idx, ids, err := s.DownstreamsForService(ws, "dc1", "admin", structs.DefaultEnterpriseMeta()) + sn := structs.NewServiceName("api", structs.DefaultEnterpriseMeta()) + idx, names, err := s.DownstreamsForService(ws, "dc1", sn) require.NoError(t, err) require.Equal(t, tc.expect.idx, idx) - require.ElementsMatch(t, tc.expect.names, ids) + require.ElementsMatch(t, tc.expect.names, names) }) } } diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index 465d4b27d5..3cb08b19e2 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -403,8 +403,7 @@ func (s *Store) targetsForSource(ws memdb.WatchSet, tx ReadTxn, dc, service stri } // sourcesForTarget will return a list of services whose discovery chains have the input service as a target -func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) { - destination := structs.NewServiceName(service, entMeta) +func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc string, destination structs.ServiceName) (uint64, []structs.ServiceName, error) { queue := []structs.ServiceName{destination} seenLink := make(map[structs.ServiceName]bool) @@ -444,7 +443,7 @@ func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc, service stri EvaluateInDatacenter: dc, UseInDatacenter: dc, } - idx, chain, err := s.ServiceDiscoveryChain(ws, sn.Name, entMeta, req) + idx, chain, err := s.ServiceDiscoveryChain(ws, sn.Name, &sn.EnterpriseMeta, req) if err != nil { return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", sn.String(), err) } diff --git a/agent/structs/structs.go b/agent/structs/structs.go index b42136049b..f0af193b6b 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -1855,6 +1855,17 @@ type IndexedGatewayServices struct { QueryMeta } +type IndexedServiceTopology struct { + ServiceTopology *ServiceTopology + FilteredByACLs bool + QueryMeta +} + +type ServiceTopology struct { + Upstreams CheckServiceNodes + Downstreams CheckServiceNodes +} + // IndexedConfigEntries has its own encoding logic which differs from // ConfigEntryRequest as it has to send a slice of ConfigEntry. type IndexedConfigEntries struct {