diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 2f449a039c..527886d36a 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -2882,6 +2882,108 @@ func checkProtocolMatch(tx ReadTxn, ws memdb.WatchSet, svc *structs.GatewayServi return idx, svc.Protocol == protocol, nil } +// UpstreamsForService will find all upstream services that the input could route traffic to. +// There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams. +// TODO (freddy): Account for ingress gateways +func (s *Store) UpstreamsForService(ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + + sn := structs.NewServiceName(service, entMeta) + idx, upstreams, err := upstreamsFromRegistration(ws, tx, sn) + if err != nil { + return 0, nil, fmt.Errorf("failed to get upstreams for %q: %v", sn.String(), err) + } + + var maxIdx uint64 + if idx > maxIdx { + maxIdx = idx + } + + var ( + resp []structs.ServiceName + seen = make(map[structs.ServiceName]bool) + ) + for _, u := range upstreams { + // Evaluate the targets from the upstream's discovery chain + idx, targets, err := s.targetsForSource(ws, tx, dc, u.Name, &u.EnterpriseMeta) + if err != nil { + return 0, nil, fmt.Errorf("failed to get discovery chain targets for %q: %v", u.String(), err) + } + if idx > maxIdx { + maxIdx = idx + } + for _, t := range targets { + if !seen[t] { + resp = append(resp, t) + seen[t] = true + } + } + if len(targets) == 0 && !seen[u] { + resp = append(resp, u) + seen[u] = true + } + } + return maxIdx, resp, nil +} + +// DownstreamsForService will find all downstream services that could route traffic to the input service. +// There are two factors at play. Upstreams defined in a proxy registration, and the discovery chain for those upstreams. +// TODO (freddy): Account for ingress gateways +func (s *Store) DownstreamsForService(ws memdb.WatchSet, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) { + tx := s.db.ReadTxn() + defer tx.Abort() + + // First fetch services with discovery chains that list the input as a target + sn := structs.NewServiceName(service, entMeta) + idx, sources, err := s.sourcesForTarget(ws, tx, dc, service, entMeta) + if err != nil { + return 0, nil, fmt.Errorf("failed to get sources for discovery chain target %q: %v", sn.String(), err) + } + + var maxIdx uint64 + if idx > maxIdx { + maxIdx = idx + } + + var ( + resp []structs.ServiceName + seen = make(map[structs.ServiceName]bool) + ) + for _, s := range sources { + // We then follow these discovery chain sources one level down to the services defining them as an upstream. + idx, downstreams, err := downstreamsFromRegistration(ws, tx, s) + if err != nil { + return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", s.String(), err) + } + if idx > maxIdx { + maxIdx = idx + } + for _, d := range downstreams { + if !seen[d] { + resp = append(resp, d) + seen[d] = true + } + } + } + + // Also append services that directly listed the input as an upstream + idx, downstreams, err := downstreamsFromRegistration(ws, tx, sn) + if err != nil { + return 0, nil, fmt.Errorf("failed to get downstreams for %q: %v", sn.String(), err) + } + if idx > maxIdx { + maxIdx = idx + } + for _, d := range downstreams { + if !seen[d] { + resp = append(resp, d) + seen[d] = true + } + } + return maxIdx, resp, nil +} + // upstreamsFromRegistration returns the ServiceNames of the upstreams defined across instances of the input func upstreamsFromRegistration(ws memdb.WatchSet, tx ReadTxn, sn structs.ServiceName) (uint64, []structs.ServiceName, error) { return linkedFromRegistration(ws, tx, sn, false) @@ -2948,6 +3050,11 @@ func updateMeshTopology(tx *txn, idx uint64, node string, svc *structs.NodeServi downstream := structs.NewServiceName(svc.Proxy.DestinationServiceName, &svc.EnterpriseMeta) inserted := make(map[structs.ServiceName]bool) for _, u := range svc.Proxy.Upstreams { + if u.DestinationType == structs.UpstreamDestTypePreparedQuery { + continue + } + + // TODO (freddy): Account for upstream datacenter upstreamMeta := structs.EnterpriseMetaInitializer(u.DestinationNamespace) upstream := structs.NewServiceName(u.DestinationName, &upstreamMeta) diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index cf16c98f4f..d330a9f1e6 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -6707,3 +6707,297 @@ func TestCatalog_upstreamsFromRegistration_Watches(t *testing.T) { require.Equal(t, exp.idx, idx) require.Empty(t, exp.names) } + +func TestCatalog_UpstreamsForService(t *testing.T) { + defaultMeta := structs.DefaultEnterpriseMeta() + + type expect struct { + idx uint64 + names []structs.ServiceName + } + tt := []struct { + name string + services []*structs.NodeService + entries []structs.ConfigEntry + expect expect + }{ + { + name: "kitchen sink", + services: []*structs.NodeService{ + { + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy", + Service: "api-proxy", + Address: "127.0.0.1", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "cache", + }, + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "admin", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + }, + { + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy-2", + Service: "api-proxy", + Address: "127.0.0.2", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "cache", + }, + structs.Upstream{ + DestinationName: "db", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + }, + { + Kind: structs.ServiceKindConnectProxy, + ID: "unrelated-proxy", + Service: "unrelated-proxy", + Address: "127.0.0.3", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "unrelated", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "teapot", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + }, + }, + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + &structs.ServiceRouterConfigEntry{ + Kind: structs.ServiceRouter, + Name: "admin", + Routes: []structs.ServiceRoute{ + { + Match: &structs.ServiceRouteMatch{ + HTTP: &structs.ServiceRouteHTTPMatch{ + PathExact: "/v2", + }, + }, + Destination: &structs.ServiceRouteDestination{ + Service: "new-admin", + }, + }, + }, + }, + &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "db", + Redirect: &structs.ServiceResolverRedirect{ + Service: "cassandra", + }, + }, + &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "web", + Redirect: &structs.ServiceResolverRedirect{ + Service: "sink", + }, + }, + }, + expect: expect{ + idx: 7, + names: []structs.ServiceName{ + {Name: "cache", EnterpriseMeta: *defaultMeta}, + {Name: "cassandra", EnterpriseMeta: *defaultMeta}, + {Name: "admin", EnterpriseMeta: *defaultMeta}, + {Name: "new-admin", EnterpriseMeta: *defaultMeta}, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + s := testStateStore(t) + + require.NoError(t, s.EnsureNode(0, &structs.Node{ + ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c", + Node: "foo", + })) + + var i uint64 = 1 + for _, svc := range tc.services { + require.NoError(t, s.EnsureService(i, "foo", svc)) + i++ + } + + ca := &structs.CAConfiguration{ + Provider: "consul", + } + err := s.CASetConfig(0, ca) + require.NoError(t, err) + + for _, entry := range tc.entries { + require.NoError(t, entry.Normalize()) + require.NoError(t, s.EnsureConfigEntry(i, entry, nil)) + i++ + } + + ws := memdb.NewWatchSet() + idx, names, err := s.UpstreamsForService(ws, "dc1", "api", structs.DefaultEnterpriseMeta()) + require.NoError(t, err) + + require.Equal(t, tc.expect.idx, idx) + require.ElementsMatch(t, tc.expect.names, names) + }) + } +} + +func TestCatalog_DownstreamsForService(t *testing.T) { + defaultMeta := structs.DefaultEnterpriseMeta() + + type expect struct { + idx uint64 + names []structs.ServiceName + } + tt := []struct { + name string + services []*structs.NodeService + entries []structs.ConfigEntry + expect expect + }{ + { + name: "kitchen sink", + services: []*structs.NodeService{ + { + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy", + Service: "api-proxy", + Address: "127.0.0.1", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "cache", + }, + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "old-admin", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + }, + { + Kind: structs.ServiceKindConnectProxy, + ID: "web-proxy", + Service: "web-proxy", + Address: "127.0.0.2", + Port: 443, + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "web", + Upstreams: structs.Upstreams{ + structs.Upstream{ + DestinationName: "db", + }, + structs.Upstream{ + DestinationName: "admin", + }, + }, + }, + EnterpriseMeta: *defaultMeta, + }, + }, + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + &structs.ServiceRouterConfigEntry{ + Kind: structs.ServiceRouter, + Name: "old-admin", + Routes: []structs.ServiceRoute{ + { + Match: &structs.ServiceRouteMatch{ + HTTP: &structs.ServiceRouteHTTPMatch{ + PathExact: "/v2", + }, + }, + Destination: &structs.ServiceRouteDestination{ + Service: "admin", + }, + }, + }, + }, + }, + expect: expect{ + idx: 4, + names: []structs.ServiceName{ + // get web from old-admin routing to admin and web listing old-admin as an upstream + {Name: "web", EnterpriseMeta: *defaultMeta}, + // get api from listing admin directly as an upstream + {Name: "api", EnterpriseMeta: *defaultMeta}, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + s := testStateStore(t) + + require.NoError(t, s.EnsureNode(0, &structs.Node{ + ID: "c73b8fdf-4ef8-4e43-9aa2-59e85cc6a70c", + Node: "foo", + })) + + var i uint64 = 1 + for _, svc := range tc.services { + require.NoError(t, s.EnsureService(i, "foo", svc)) + i++ + } + + ca := &structs.CAConfiguration{ + Provider: "consul", + } + err := s.CASetConfig(0, ca) + require.NoError(t, err) + + for _, entry := range tc.entries { + require.NoError(t, entry.Normalize()) + require.NoError(t, s.EnsureConfigEntry(i, entry, nil)) + i++ + } + + ws := memdb.NewWatchSet() + idx, ids, err := s.DownstreamsForService(ws, "dc1", "admin", structs.DefaultEnterpriseMeta()) + require.NoError(t, err) + + require.Equal(t, tc.expect.idx, idx) + require.ElementsMatch(t, tc.expect.names, ids) + }) + } +} diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index 21107413c4..465d4b27d5 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -373,6 +373,35 @@ var serviceGraphKinds = []string{ structs.ServiceResolver, } +// targetsForSource will return a list of services listed as a target for the input's discovery chain +func (s *Store) targetsForSource(ws memdb.WatchSet, tx ReadTxn, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) { + source := structs.NewServiceName(service, entMeta) + req := discoverychain.CompileRequest{ + ServiceName: source.Name, + EvaluateInNamespace: source.NamespaceOrDefault(), + + // TODO(freddy) : Should these be anything other than the known DC? + EvaluateInDatacenter: dc, + UseInDatacenter: dc, + } + idx, chain, err := s.ServiceDiscoveryChain(ws, source.Name, entMeta, req) + if err != nil { + return 0, nil, fmt.Errorf("failed to fetch discovery chain for %q: %v", source.String(), err) + } + + var resp []structs.ServiceName + for _, t := range chain.Targets { + em := structs.EnterpriseMetaInitializer(t.Namespace) + target := structs.NewServiceName(t.Service, &em) + + // TODO (freddy): Allow upstream DC and encode in response + if t.Datacenter == dc { + resp = append(resp, target) + } + } + return idx, resp, nil +} + // sourcesForTarget will return a list of services whose discovery chains have the input service as a target func (s *Store) sourcesForTarget(ws memdb.WatchSet, tx ReadTxn, dc, service string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ServiceName, error) { destination := structs.NewServiceName(service, entMeta) diff --git a/agent/consul/state/config_entry_test.go b/agent/consul/state/config_entry_test.go index 887441b8a5..c6ccde4159 100644 --- a/agent/consul/state/config_entry_test.go +++ b/agent/consul/state/config_entry_test.go @@ -1721,3 +1721,204 @@ func TestSourcesForTarget(t *testing.T) { }) } } + +func TestTargetsForSource(t *testing.T) { + defaultMeta := *structs.DefaultEnterpriseMeta() + + type expect struct { + idx uint64 + ids []structs.ServiceName + } + tt := []struct { + name string + entries []structs.ConfigEntry + expect expect + }{ + { + name: "from route match", + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + &structs.ServiceRouterConfigEntry{ + Kind: structs.ServiceRouter, + Name: "web", + Routes: []structs.ServiceRoute{ + { + Match: &structs.ServiceRouteMatch{ + HTTP: &structs.ServiceRouteHTTPMatch{ + PathExact: "/sink", + }, + }, + Destination: &structs.ServiceRouteDestination{ + Service: "sink", + }, + }, + }, + }, + }, + expect: expect{ + idx: 2, + ids: []structs.ServiceName{ + {Name: "web", EnterpriseMeta: defaultMeta}, + {Name: "sink", EnterpriseMeta: defaultMeta}, + }, + }, + }, + { + name: "from redirect", + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "web", + Redirect: &structs.ServiceResolverRedirect{ + Service: "sink", + }, + }, + }, + expect: expect{ + idx: 2, + ids: []structs.ServiceName{ + {Name: "sink", EnterpriseMeta: defaultMeta}, + }, + }, + }, + { + name: "from failover", + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "web", + Failover: map[string]structs.ServiceResolverFailover{ + "*": { + Service: "remote-web", + Datacenters: []string{"dc2", "dc3"}, + }, + }, + }, + }, + expect: expect{ + idx: 2, + ids: []structs.ServiceName{ + {Name: "web", EnterpriseMeta: defaultMeta}, + }, + }, + }, + { + name: "from splitter", + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + &structs.ServiceSplitterConfigEntry{ + Kind: structs.ServiceSplitter, + Name: "web", + Splits: []structs.ServiceSplit{ + {Weight: 90, Service: "web"}, + {Weight: 10, Service: "sink"}, + }, + }, + }, + expect: expect{ + idx: 2, + ids: []structs.ServiceName{ + {Name: "web", EnterpriseMeta: defaultMeta}, + {Name: "sink", EnterpriseMeta: defaultMeta}, + }, + }, + }, + { + name: "chained route redirect", + entries: []structs.ConfigEntry{ + &structs.ProxyConfigEntry{ + Kind: structs.ProxyDefaults, + Name: structs.ProxyConfigGlobal, + Config: map[string]interface{}{ + "protocol": "http", + }, + }, + &structs.ServiceRouterConfigEntry{ + Kind: structs.ServiceRouter, + Name: "web", + Routes: []structs.ServiceRoute{ + { + Match: &structs.ServiceRouteMatch{ + HTTP: &structs.ServiceRouteHTTPMatch{ + PathExact: "/route", + }, + }, + Destination: &structs.ServiceRouteDestination{ + Service: "routed", + }, + }, + }, + }, + &structs.ServiceResolverConfigEntry{ + Kind: structs.ServiceResolver, + Name: "routed", + Redirect: &structs.ServiceResolverRedirect{ + Service: "sink", + }, + }, + }, + expect: expect{ + idx: 3, + ids: []structs.ServiceName{ + {Name: "web", EnterpriseMeta: defaultMeta}, + {Name: "sink", EnterpriseMeta: defaultMeta}, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + s := testStateStore(t) + ws := memdb.NewWatchSet() + + ca := &structs.CAConfiguration{ + Provider: "consul", + } + err := s.CASetConfig(0, ca) + require.NoError(t, err) + + var i uint64 = 1 + for _, entry := range tc.entries { + require.NoError(t, entry.Normalize()) + require.NoError(t, s.EnsureConfigEntry(i, entry, nil)) + i++ + } + + tx := s.db.ReadTxn() + defer tx.Abort() + + idx, ids, err := s.targetsForSource(ws, tx, "dc1", "web", nil) + require.NoError(t, err) + + require.Equal(t, tc.expect.idx, idx) + require.ElementsMatch(t, tc.expect.ids, ids) + }) + } +}