From 424f3cdd2c233afd64e231b89b3414657c3d2393 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Thu, 20 Jan 2022 10:12:04 -0600 Subject: [PATCH] proxycfg: introduce explicit UpstreamID in lieu of bare string (#12125) The gist here is that now we use a value-type struct proxycfg.UpstreamID as the map key in ConfigSnapshot maps where we used to use "upstream id-ish" strings. These are internal only and used just for bidirectional trips through the agent cache keyspace (like the discovery chain target struct). For the few places where the upstream id needs to be projected into xDS, that's what (proxycfg.UpstreamID).EnvoyID() is for. This lets us ALWAYS inject the partition and namespace into these things without making stuff like the golden testdata diverge. --- agent/proxycfg/connect_proxy.go | 104 +++++----- agent/proxycfg/ingress_gateway.go | 28 +-- agent/proxycfg/manager_test.go | 53 ++--- agent/proxycfg/naming.go | 130 ++++++++++++ agent/proxycfg/naming_oss.go | 30 +++ agent/proxycfg/naming_test.go | 195 ++++++++++++++++++ agent/proxycfg/snapshot.go | 50 ++--- agent/proxycfg/state.go | 4 +- agent/proxycfg/state_test.go | 143 +++++++------ agent/proxycfg/testing.go | 80 +++---- agent/proxycfg/upstreams.go | 108 +++++----- agent/structs/connect_proxy_config.go | 26 +-- agent/structs/connect_proxy_config_oss.go | 2 +- agent/xds/clusters.go | 44 ++-- agent/xds/clusters_test.go | 27 ++- agent/xds/delta_test.go | 10 +- agent/xds/endpoints.go | 38 ++-- agent/xds/endpoints_test.go | 4 +- agent/xds/listeners.go | 51 ++--- agent/xds/listeners_ingress.go | 13 +- agent/xds/listeners_test.go | 85 +++++--- agent/xds/routes.go | 18 +- agent/xds/routes_test.go | 17 +- .../custom-upstream.envoy-1-20-x.golden | 13 +- agent/xds/xds_protocol_helpers_test.go | 6 +- 25 files changed, 843 insertions(+), 436 deletions(-) create mode 100644 agent/proxycfg/naming.go create mode 100644 agent/proxycfg/naming_oss.go create mode 100644 agent/proxycfg/naming_test.go diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index bd21dd2a75..629c416425 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -18,16 +18,16 @@ type handlerConnectProxy struct { // state. func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, error) { snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig) - snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain) - snap.ConnectProxy.WatchedDiscoveryChains = make(map[string]context.CancelFunc) - snap.ConnectProxy.WatchedUpstreams = make(map[string]map[string]context.CancelFunc) - snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes) - snap.ConnectProxy.WatchedGateways = make(map[string]map[string]context.CancelFunc) - snap.ConnectProxy.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes) + snap.ConnectProxy.DiscoveryChain = make(map[UpstreamID]*structs.CompiledDiscoveryChain) + snap.ConnectProxy.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc) + snap.ConnectProxy.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc) + snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) + snap.ConnectProxy.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc) + snap.ConnectProxy.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType) - snap.ConnectProxy.PreparedQueryEndpoints = make(map[string]structs.CheckServiceNodes) - snap.ConnectProxy.UpstreamConfig = make(map[string]*structs.Upstream) - snap.ConnectProxy.PassthroughUpstreams = make(map[string]ServicePassthroughAddrs) + snap.ConnectProxy.PreparedQueryEndpoints = make(map[UpstreamID]structs.CheckServiceNodes) + snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream) + snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]ServicePassthroughAddrs) // Watch for root changes err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ @@ -106,9 +106,11 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e for i := range s.proxyCfg.Upstreams { u := s.proxyCfg.Upstreams[i] + uid := NewUpstreamID(&u) + // Store defaults keyed under wildcard so they can be applied to centrally configured upstreams if u.DestinationName == structs.WildcardSpecifier { - snap.ConnectProxy.UpstreamConfig[u.DestinationID().String()] = &u + snap.ConnectProxy.UpstreamConfig[uid] = &u continue } @@ -117,7 +119,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e if u.CentrallyConfigured { continue } - snap.ConnectProxy.UpstreamConfig[u.Identifier()] = &u + snap.ConnectProxy.UpstreamConfig[uid] = &u dc := s.source.Datacenter if u.Datacenter != "" { @@ -144,7 +146,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e // the plain discovery chain if there is an error so it's safe to // continue. s.logger.Warn("failed to parse upstream config", - "upstream", u.Identifier(), + "upstream", uid.String(), "error", err, ) } @@ -157,7 +159,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e QueryIDOrName: u.DestinationName, Connect: true, Source: *s.source, - }, "upstream:"+u.Identifier(), s.ch) + }, "upstream:"+uid.String(), s.ch) if err != nil { return snap, err } @@ -176,9 +178,9 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e OverrideMeshGateway: s.proxyCfg.MeshGateway.OverlayWith(u.MeshGateway), OverrideProtocol: cfg.Protocol, OverrideConnectTimeout: cfg.ConnectTimeout(), - }, "discovery-chain:"+u.Identifier(), s.ch) + }, "discovery-chain:"+uid.String(), s.ch) if err != nil { - return snap, fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err) + return snap, fmt.Errorf("failed to watch discovery chain for %s: %v", uid.String(), err) } default: @@ -220,12 +222,14 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv return fmt.Errorf("invalid type for response %T", u.Result) } - seenServices := make(map[string]struct{}) + seenUpstreams := make(map[UpstreamID]struct{}) for _, svc := range resp.Services { - seenServices[svc.String()] = struct{}{} + uid := NewUpstreamIDFromServiceName(svc) + + seenUpstreams[uid] = struct{}{} cfgMap := make(map[string]interface{}) - u, ok := snap.ConnectProxy.UpstreamConfig[svc.String()] + u, ok := snap.ConnectProxy.UpstreamConfig[uid] if ok { cfgMap = u.Config } else { @@ -233,11 +237,12 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv // This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled // by the ResolveServiceConfig endpoint. wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, s.proxyID.WithWildcardNamespace()) - defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardSID.String()] + wildcardUID := NewUpstreamIDFromServiceID(wildcardSID) + defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardUID] if ok { u = defaults cfgMap = defaults.Config - snap.ConnectProxy.UpstreamConfig[svc.String()] = defaults + snap.ConnectProxy.UpstreamConfig[uid] = defaults } } @@ -247,7 +252,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv // the plain discovery chain if there is an error so it's safe to // continue. s.logger.Warn("failed to parse upstream config", - "upstream", u.Identifier(), + "upstream", uid, "error", err, ) } @@ -257,7 +262,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv meshGateway = meshGateway.OverlayWith(u.MeshGateway) } watchOpts := discoveryChainWatchOpts{ - id: svc.String(), + id: NewUpstreamIDFromServiceName(svc), name: svc.Name, namespace: svc.NamespaceOrDefault(), partition: svc.PartitionOrDefault(), @@ -268,69 +273,69 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv up := &handlerUpstreams{handlerState: s.handlerState} err = up.watchDiscoveryChain(ctx, snap, watchOpts) if err != nil { - return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err) + return fmt.Errorf("failed to watch discovery chain for %s: %v", uid, err) } } - snap.ConnectProxy.IntentionUpstreams = seenServices + snap.ConnectProxy.IntentionUpstreams = seenUpstreams // Clean up data from services that were not in the update - for sn, targets := range snap.ConnectProxy.WatchedUpstreams { - if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + for uid, targets := range snap.ConnectProxy.WatchedUpstreams { + if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } - if _, ok := seenServices[sn]; !ok { + if _, ok := seenUpstreams[uid]; !ok { for _, cancelFn := range targets { cancelFn() } - delete(snap.ConnectProxy.WatchedUpstreams, sn) + delete(snap.ConnectProxy.WatchedUpstreams, uid) } } - for sn := range snap.ConnectProxy.WatchedUpstreamEndpoints { - if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + for uid := range snap.ConnectProxy.WatchedUpstreamEndpoints { + if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } - if _, ok := seenServices[sn]; !ok { - delete(snap.ConnectProxy.WatchedUpstreamEndpoints, sn) + if _, ok := seenUpstreams[uid]; !ok { + delete(snap.ConnectProxy.WatchedUpstreamEndpoints, uid) } } - for sn, cancelMap := range snap.ConnectProxy.WatchedGateways { - if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + for uid, cancelMap := range snap.ConnectProxy.WatchedGateways { + if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } - if _, ok := seenServices[sn]; !ok { + if _, ok := seenUpstreams[uid]; !ok { for _, cancelFn := range cancelMap { cancelFn() } - delete(snap.ConnectProxy.WatchedGateways, sn) + delete(snap.ConnectProxy.WatchedGateways, uid) } } - for sn := range snap.ConnectProxy.WatchedGatewayEndpoints { - if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + for uid := range snap.ConnectProxy.WatchedGatewayEndpoints { + if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } - if _, ok := seenServices[sn]; !ok { - delete(snap.ConnectProxy.WatchedGatewayEndpoints, sn) + if _, ok := seenUpstreams[uid]; !ok { + delete(snap.ConnectProxy.WatchedGatewayEndpoints, uid) } } - for sn, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains { - if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + for uid, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains { + if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } - if _, ok := seenServices[sn]; !ok { + if _, ok := seenUpstreams[uid]; !ok { cancelFn() - delete(snap.ConnectProxy.WatchedDiscoveryChains, sn) + delete(snap.ConnectProxy.WatchedDiscoveryChains, uid) } } // These entries are intentionally handled separately from the WatchedDiscoveryChains above. // There have been situations where a discovery watch was cancelled, then fired. // That update event then re-populated the DiscoveryChain map entry, which wouldn't get cleaned up // since there was no known watch for it. - for sn := range snap.ConnectProxy.DiscoveryChain { - if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + for uid := range snap.ConnectProxy.DiscoveryChain { + if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } - if _, ok := seenServices[sn]; !ok { - delete(snap.ConnectProxy.DiscoveryChain, sn) + if _, ok := seenUpstreams[uid]; !ok { + delete(snap.ConnectProxy.DiscoveryChain, uid) } } @@ -340,7 +345,8 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv return fmt.Errorf("invalid type for response: %T", u.Result) } pq := strings.TrimPrefix(u.CorrelationID, "upstream:") - snap.ConnectProxy.PreparedQueryEndpoints[pq] = resp.Nodes + uid := UpstreamIDFromString(pq) + snap.ConnectProxy.PreparedQueryEndpoints[uid] = resp.Nodes case strings.HasPrefix(u.CorrelationID, svcChecksWatchIDPrefix): resp, ok := u.Result.([]structs.CheckType) diff --git a/agent/proxycfg/ingress_gateway.go b/agent/proxycfg/ingress_gateway.go index e14dce07b3..1a5eb5ed9f 100644 --- a/agent/proxycfg/ingress_gateway.go +++ b/agent/proxycfg/ingress_gateway.go @@ -48,12 +48,12 @@ func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot, return snap, err } - snap.IngressGateway.WatchedDiscoveryChains = make(map[string]context.CancelFunc) - snap.IngressGateway.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain) - snap.IngressGateway.WatchedUpstreams = make(map[string]map[string]context.CancelFunc) - snap.IngressGateway.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes) - snap.IngressGateway.WatchedGateways = make(map[string]map[string]context.CancelFunc) - snap.IngressGateway.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes) + snap.IngressGateway.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc) + snap.IngressGateway.DiscoveryChain = make(map[UpstreamID]*structs.CompiledDiscoveryChain) + snap.IngressGateway.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc) + snap.IngressGateway.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) + snap.IngressGateway.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc) + snap.IngressGateway.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) snap.IngressGateway.Listeners = make(map[IngressListenerKey]structs.IngressListener) return snap, nil } @@ -102,13 +102,15 @@ func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u cache.Update // Update our upstreams and watches. var hosts []string - watchedSvcs := make(map[string]struct{}) + watchedSvcs := make(map[UpstreamID]struct{}) upstreamsMap := make(map[IngressListenerKey]structs.Upstreams) for _, service := range services.Services { u := makeUpstream(service) + uid := NewUpstreamID(&u) + watchOpts := discoveryChainWatchOpts{ - id: u.Identifier(), + id: uid, name: u.DestinationName, namespace: u.DestinationNamespace, partition: u.DestinationPartition, @@ -117,9 +119,9 @@ func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u cache.Update up := &handlerUpstreams{handlerState: s.handlerState} err := up.watchDiscoveryChain(ctx, snap, watchOpts) if err != nil { - return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err) + return fmt.Errorf("failed to watch discovery chain for %s: %v", uid, err) } - watchedSvcs[u.Identifier()] = struct{}{} + watchedSvcs[uid] = struct{}{} hosts = append(hosts, service.Hosts...) @@ -132,10 +134,10 @@ func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u cache.Update snap.IngressGateway.Hosts = hosts snap.IngressGateway.HostsSet = true - for id, cancelFn := range snap.IngressGateway.WatchedDiscoveryChains { - if _, ok := watchedSvcs[id]; !ok { + for uid, cancelFn := range snap.IngressGateway.WatchedDiscoveryChains { + if _, ok := watchedSvcs[uid]; !ok { cancelFn() - delete(snap.IngressGateway.WatchedDiscoveryChains, id) + delete(snap.IngressGateway.WatchedDiscoveryChains, uid) } } diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index cdb271ee35..9b7fe0060c 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -187,6 +187,7 @@ func TestManager_BasicLifecycle(t *testing.T) { }) db := structs.NewServiceName("db", nil) + dbUID := NewUpstreamIDFromServiceName(db) // Create test cases using some of the common data above. tests := []*testcase_BasicLifecycle{ @@ -214,28 +215,28 @@ func TestManager_BasicLifecycle(t *testing.T) { ConnectProxy: configSnapshotConnectProxy{ ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{ Leaf: leaf, - DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ - db.String(): dbDefaultChain(), + DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{ + dbUID: dbDefaultChain(), }, - WatchedDiscoveryChains: map[string]context.CancelFunc{}, + WatchedDiscoveryChains: map[UpstreamID]context.CancelFunc{}, WatchedUpstreams: nil, // Clone() clears this out - WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{ - db.String(): { + WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: { "db.default.default.dc1": TestUpstreamNodes(t, db.Name), }, }, WatchedGateways: nil, // Clone() clears this out - WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{ - db.String(): {}, + WatchedGatewayEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: {}, }, - UpstreamConfig: map[string]*structs.Upstream{ - upstreams[0].Identifier(): &upstreams[0], - upstreams[1].Identifier(): &upstreams[1], - upstreams[2].Identifier(): &upstreams[2], + UpstreamConfig: map[UpstreamID]*structs.Upstream{ + NewUpstreamID(&upstreams[0]): &upstreams[0], + NewUpstreamID(&upstreams[1]): &upstreams[1], + NewUpstreamID(&upstreams[2]): &upstreams[2], }, - PassthroughUpstreams: map[string]ServicePassthroughAddrs{}, + PassthroughUpstreams: map[UpstreamID]ServicePassthroughAddrs{}, }, - PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{}, + PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, Intentions: TestIntentions().Matches[0], IntentionsSet: true, @@ -271,29 +272,29 @@ func TestManager_BasicLifecycle(t *testing.T) { ConnectProxy: configSnapshotConnectProxy{ ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{ Leaf: leaf, - DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ - db.String(): dbSplitChain(), + DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{ + dbUID: dbSplitChain(), }, - WatchedDiscoveryChains: map[string]context.CancelFunc{}, + WatchedDiscoveryChains: map[UpstreamID]context.CancelFunc{}, WatchedUpstreams: nil, // Clone() clears this out - WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{ - db.String(): { + WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: { "v1.db.default.default.dc1": TestUpstreamNodes(t, db.Name), "v2.db.default.default.dc1": TestUpstreamNodesAlternate(t), }, }, WatchedGateways: nil, // Clone() clears this out - WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{ - db.String(): {}, + WatchedGatewayEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: {}, }, - UpstreamConfig: map[string]*structs.Upstream{ - upstreams[0].Identifier(): &upstreams[0], - upstreams[1].Identifier(): &upstreams[1], - upstreams[2].Identifier(): &upstreams[2], + UpstreamConfig: map[UpstreamID]*structs.Upstream{ + NewUpstreamID(&upstreams[0]): &upstreams[0], + NewUpstreamID(&upstreams[1]): &upstreams[1], + NewUpstreamID(&upstreams[2]): &upstreams[2], }, - PassthroughUpstreams: map[string]ServicePassthroughAddrs{}, + PassthroughUpstreams: map[UpstreamID]ServicePassthroughAddrs{}, }, - PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{}, + PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, Intentions: TestIntentions().Matches[0], IntentionsSet: true, diff --git a/agent/proxycfg/naming.go b/agent/proxycfg/naming.go new file mode 100644 index 0000000000..5304b8d1a6 --- /dev/null +++ b/agent/proxycfg/naming.go @@ -0,0 +1,130 @@ +package proxycfg + +import ( + "strings" + + "github.com/hashicorp/consul/agent/structs" +) + +type UpstreamID struct { + Type string + Name string + Datacenter string + structs.EnterpriseMeta +} + +func NewUpstreamID(u *structs.Upstream) UpstreamID { + id := UpstreamID{ + Type: u.DestinationType, + Name: u.DestinationName, + Datacenter: u.Datacenter, + EnterpriseMeta: structs.NewEnterpriseMetaWithPartition( + u.DestinationPartition, + u.DestinationNamespace, + ), + } + id.normalize() + return id +} + +func NewUpstreamIDFromServiceName(sn structs.ServiceName) UpstreamID { + id := UpstreamID{ + Name: sn.Name, + EnterpriseMeta: sn.EnterpriseMeta, + } + id.normalize() + return id +} + +func NewUpstreamIDFromServiceID(sid structs.ServiceID) UpstreamID { + id := UpstreamID{ + Name: sid.ID, + EnterpriseMeta: sid.EnterpriseMeta, + } + id.normalize() + return id +} + +func (u *UpstreamID) normalize() { + if u.Type == structs.UpstreamDestTypeService { + u.Type = "" + } + + u.EnterpriseMeta.Normalize() +} + +// String encodes the UpstreamID into a string for use in agent cache keys. +// You can decode it back again using UpstreamIDFromString. +func (u UpstreamID) String() string { + return UpstreamIDString(u.Type, u.Datacenter, u.Name, &u.EnterpriseMeta) +} + +func (u UpstreamID) GoString() string { + return u.String() +} + +func UpstreamIDFromString(input string) UpstreamID { + typ, dc, name, entMeta := ParseUpstreamIDString(input) + id := UpstreamID{ + Type: typ, + Datacenter: dc, + Name: name, + EnterpriseMeta: *entMeta, + } + id.normalize() + return id +} + +const upstreamTypePreparedQueryPrefix = structs.UpstreamDestTypePreparedQuery + ":" + +func ParseUpstreamIDString(input string) (typ, dc, name string, meta *structs.EnterpriseMeta) { + if strings.HasPrefix(input, upstreamTypePreparedQueryPrefix) { + typ = structs.UpstreamDestTypePreparedQuery + input = strings.TrimPrefix(input, upstreamTypePreparedQueryPrefix) + } + + idx := strings.LastIndex(input, "?dc=") + if idx != -1 { + dc = input[idx+4:] + input = input[0:idx] + } + + name, meta = parseInnerUpstreamIDString(input) + + return typ, dc, name, meta +} + +// EnvoyID returns a string representation that uniquely identifies the +// upstream in a canonical but human readable way. +// +// This should be used for any situation where we generate identifiers in Envoy +// xDS structures for this upstream. +// +// This will ensure that generated identifiers for the same thing in OSS and +// Enterprise render the same and omit default namespaces and partitions. +func (u UpstreamID) EnvoyID() string { + name := u.enterpriseIdentifierPrefix() + u.Name + typ := u.Type + + if u.Datacenter != "" { + name += "?dc=" + u.Datacenter + } + + // Service is default type so never prefix it. This is more readable and long + // term it is the only type that matters so we can drop the prefix and have + // nicer naming in metrics etc. + if typ == "" || typ == structs.UpstreamDestTypeService { + return name + } + return typ + ":" + name +} + +func UpstreamsToMap(us structs.Upstreams) map[UpstreamID]*structs.Upstream { + upstreamMap := make(map[UpstreamID]*structs.Upstream) + + for i := range us { + u := us[i] + upstreamMap[NewUpstreamID(&u)] = &u + } + return upstreamMap +} diff --git a/agent/proxycfg/naming_oss.go b/agent/proxycfg/naming_oss.go new file mode 100644 index 0000000000..bbcf1d0e82 --- /dev/null +++ b/agent/proxycfg/naming_oss.go @@ -0,0 +1,30 @@ +//go:build !consulent +// +build !consulent + +package proxycfg + +import ( + "github.com/hashicorp/consul/agent/structs" +) + +func UpstreamIDString(typ, dc, name string, _ *structs.EnterpriseMeta) string { + ret := name + + if dc != "" { + ret += "?dc=" + dc + } + + if typ == "" || typ == structs.UpstreamDestTypeService { + return ret + } + + return typ + ":" + ret +} + +func parseInnerUpstreamIDString(input string) (string, *structs.EnterpriseMeta) { + return input, structs.DefaultEnterpriseMetaInDefaultPartition() +} + +func (u UpstreamID) enterpriseIdentifierPrefix() string { + return "" +} diff --git a/agent/proxycfg/naming_test.go b/agent/proxycfg/naming_test.go new file mode 100644 index 0000000000..d74ae7e05c --- /dev/null +++ b/agent/proxycfg/naming_test.go @@ -0,0 +1,195 @@ +package proxycfg + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/structs" +) + +func TestUpstreamIDFromString(t *testing.T) { + type testcase struct { + id string + expect UpstreamID + } + run := func(t *testing.T, tc testcase) { + tc.expect.EnterpriseMeta.Normalize() + + got := UpstreamIDFromString(tc.id) + require.Equal(t, tc.expect, got) + } + + prefix := "" + if structs.DefaultEnterpriseMetaInDefaultPartition().PartitionOrEmpty() != "" { + prefix = "default/default/" + } + + cases := map[string]testcase{ + "prepared query": { + "prepared_query:" + prefix + "foo", + UpstreamID{ + Type: structs.UpstreamDestTypePreparedQuery, + Name: "foo", + }, + }, + "prepared query dc": { + "prepared_query:" + prefix + "foo?dc=dc2", + UpstreamID{ + Type: structs.UpstreamDestTypePreparedQuery, + Name: "foo", + Datacenter: "dc2", + }, + }, + "normal": { + prefix + "foo", + UpstreamID{ + Name: "foo", + }, + }, + "normal dc": { + prefix + "foo?dc=dc2", + UpstreamID{ + Name: "foo", + Datacenter: "dc2", + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func TestUpstreamID_String(t *testing.T) { + type testcase struct { + u UpstreamID + expect string + } + run := func(t *testing.T, tc testcase) { + got := tc.u.String() + require.Equal(t, tc.expect, got) + } + + prefix := "" + if structs.DefaultEnterpriseMetaInDefaultPartition().PartitionOrEmpty() != "" { + prefix = "default/default/" + } + + cases := map[string]testcase{ + "prepared query": { + UpstreamID{ + Type: structs.UpstreamDestTypePreparedQuery, + Name: "foo", + }, + "prepared_query:" + prefix + "foo", + }, + "prepared query dc": { + UpstreamID{ + Type: structs.UpstreamDestTypePreparedQuery, + Name: "foo", + Datacenter: "dc2", + }, + "prepared_query:" + prefix + "foo?dc=dc2", + }, + "normal implicit": { + UpstreamID{ + Name: "foo", + }, + prefix + "foo", + }, + "normal implicit dc": { + UpstreamID{ + Name: "foo", + Datacenter: "dc2", + }, + prefix + "foo?dc=dc2", + }, + "normal explicit": { + UpstreamID{ + Type: structs.UpstreamDestTypeService, + Name: "foo", + }, + prefix + "foo", + }, + "normal explicit dc": { + UpstreamID{ + Type: structs.UpstreamDestTypeService, + Name: "foo", + Datacenter: "dc2", + }, + prefix + "foo?dc=dc2", + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func TestUpstreamID_EnvoyID(t *testing.T) { + type testcase struct { + u UpstreamID + expect string + } + run := func(t *testing.T, tc testcase) { + got := tc.u.EnvoyID() + require.Equal(t, tc.expect, got) + } + + cases := map[string]testcase{ + "prepared query": { + UpstreamID{ + Type: structs.UpstreamDestTypePreparedQuery, + Name: "foo", + }, + "prepared_query:foo", + }, + "prepared query dc": { + UpstreamID{ + Type: structs.UpstreamDestTypePreparedQuery, + Name: "foo", + Datacenter: "dc2", + }, + "prepared_query:foo?dc=dc2", + }, + "normal implicit": { + UpstreamID{ + Name: "foo", + }, + "foo", + }, + "normal implicit dc": { + UpstreamID{ + Name: "foo", + Datacenter: "dc2", + }, + "foo?dc=dc2", + }, + "normal explicit": { + UpstreamID{ + Type: structs.UpstreamDestTypeService, + Name: "foo", + }, + "foo", + }, + "normal explicit dc": { + UpstreamID{ + Type: structs.UpstreamDestTypeService, + Name: "foo", + Datacenter: "dc2", + }, + "foo?dc=dc2", + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + run(t, tc) + }) + } +} diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index 219c45694c..35bea81f57 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -18,47 +18,47 @@ import ( type ConfigSnapshotUpstreams struct { Leaf *structs.IssuedCert - // DiscoveryChain is a map of upstream.Identifier() -> - // CompiledDiscoveryChain's, and is used to determine what services could be - // targeted by this upstream. We then instantiate watches for those targets. - DiscoveryChain map[string]*structs.CompiledDiscoveryChain + // DiscoveryChain is a map of UpstreamID -> CompiledDiscoveryChain's, and + // is used to determine what services could be targeted by this upstream. + // We then instantiate watches for those targets. + DiscoveryChain map[UpstreamID]*structs.CompiledDiscoveryChain - // WatchedDiscoveryChains is a map of upstream.Identifier() -> CancelFunc's + // WatchedDiscoveryChains is a map of UpstreamID -> CancelFunc's // in order to cancel any watches when the proxy's configuration is // changed. Ingress gateways and transparent proxies need this because // discovery chain watches are added and removed through the lifecycle // of a single proxycfg state instance. - WatchedDiscoveryChains map[string]context.CancelFunc + WatchedDiscoveryChains map[UpstreamID]context.CancelFunc - // WatchedUpstreams is a map of upstream.Identifier() -> (map of TargetID -> + // WatchedUpstreams is a map of UpstreamID -> (map of TargetID -> // CancelFunc's) in order to cancel any watches when the configuration is // changed. - WatchedUpstreams map[string]map[string]context.CancelFunc + WatchedUpstreams map[UpstreamID]map[string]context.CancelFunc - // WatchedUpstreamEndpoints is a map of upstream.Identifier() -> (map of + // WatchedUpstreamEndpoints is a map of UpstreamID -> (map of // TargetID -> CheckServiceNodes) and is used to determine the backing // endpoints of an upstream. - WatchedUpstreamEndpoints map[string]map[string]structs.CheckServiceNodes + WatchedUpstreamEndpoints map[UpstreamID]map[string]structs.CheckServiceNodes - // WatchedGateways is a map of upstream.Identifier() -> (map of - // GatewayKey.String() -> CancelFunc) in order to cancel watches for mesh gateways - WatchedGateways map[string]map[string]context.CancelFunc + // WatchedGateways is a map of UpstreamID -> (map of GatewayKey.String() -> + // CancelFunc) in order to cancel watches for mesh gateways + WatchedGateways map[UpstreamID]map[string]context.CancelFunc - // WatchedGatewayEndpoints is a map of upstream.Identifier() -> (map of - // GatewayKey.String() -> CheckServiceNodes) and is used to determine the backing - // endpoints of a mesh gateway. - WatchedGatewayEndpoints map[string]map[string]structs.CheckServiceNodes + // WatchedGatewayEndpoints is a map of UpstreamID -> (map of + // GatewayKey.String() -> CheckServiceNodes) and is used to determine the + // backing endpoints of a mesh gateway. + WatchedGatewayEndpoints map[UpstreamID]map[string]structs.CheckServiceNodes // UpstreamConfig is a map to an upstream's configuration. - UpstreamConfig map[string]*structs.Upstream + UpstreamConfig map[UpstreamID]*structs.Upstream - // PassthroughEndpoints is a map of: ServiceName -> ServicePassthroughAddrs. - PassthroughUpstreams map[string]ServicePassthroughAddrs + // PassthroughEndpoints is a map of: UpstreamID -> ServicePassthroughAddrs. + PassthroughUpstreams map[UpstreamID]ServicePassthroughAddrs // IntentionUpstreams is a set of upstreams inferred from intentions. - // The keys are created with structs.ServiceName.String(). + // // This list only applies to proxies registered in 'transparent' mode. - IntentionUpstreams map[string]struct{} + IntentionUpstreams map[UpstreamID]struct{} } type GatewayKey struct { @@ -107,7 +107,7 @@ type configSnapshotConnectProxy struct { ConfigSnapshotUpstreams WatchedServiceChecks map[structs.ServiceID][]structs.CheckType // TODO: missing garbage collection - PreparedQueryEndpoints map[string]structs.CheckServiceNodes // DEPRECATED:see:WatchedUpstreamEndpoints + PreparedQueryEndpoints map[UpstreamID]structs.CheckServiceNodes // DEPRECATED:see:WatchedUpstreamEndpoints // NOTE: Intentions stores a list of lists as returned by the Intentions // Match RPC. So far we only use the first list as the list of matching @@ -371,8 +371,8 @@ type configSnapshotIngressGateway struct { // the GatewayServices RPC to retrieve them. Upstreams map[IngressListenerKey]structs.Upstreams - // UpstreamsSet is the unique set of upstream.Identifier() the gateway routes to. - UpstreamsSet map[string]struct{} + // UpstreamsSet is the unique set of UpstreamID the gateway routes to. + UpstreamsSet map[UpstreamID]struct{} // Listeners is the original listener config from the ingress-gateway config // entry to save us trying to pass fields through Upstreams diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 586bc39386..f31c62b4e1 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -437,7 +437,7 @@ type gatewayWatchOpts struct { source structs.QuerySource token string key GatewayKey - upstreamID string + upstreamID UpstreamID } func watchMeshGateway(ctx context.Context, opts gatewayWatchOpts) error { @@ -448,5 +448,5 @@ func watchMeshGateway(ctx context.Context, opts gatewayWatchOpts) error { UseServiceKind: true, Source: opts.source, EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(opts.key.Partition), - }, fmt.Sprintf("mesh-gateway:%s:%s", opts.key.String(), opts.upstreamID), opts.notifyCh) + }, fmt.Sprintf("mesh-gateway:%s:%s", opts.key.String(), opts.upstreamID.String()), opts.notifyCh) } diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 77d8fee39b..e43fc6481d 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -381,8 +381,9 @@ func ingressConfigWatchEvent(gwTLS bool, mixedTLS bool) cache.UpdateEvent { } } -func upstreamIDForDC2(name string) string { - return fmt.Sprintf("%s?dc=dc2", name) +func upstreamIDForDC2(uid UpstreamID) UpstreamID { + uid.Datacenter = "dc2" + return uid } // This test is meant to exercise the various parts of the cache watching done by the state as @@ -410,6 +411,10 @@ func TestState_WatchesAndUpdates(t *testing.T) { db = structs.NewServiceName("db", nil) billing = structs.NewServiceName("billing", nil) api = structs.NewServiceName("api", nil) + + apiUID = NewUpstreamIDFromServiceName(api) + dbUID = NewUpstreamIDFromServiceName(db) + pqUID = UpstreamIDFromString("prepared_query:query") ) rootWatchEvent := func() cache.UpdateEvent { @@ -498,11 +503,11 @@ func TestState_WatchesAndUpdates(t *testing.T) { stage0 := verificationStage{ requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), - leafWatchID: genVerifyLeafWatch("web", "dc1"), - intentionsWatchID: genVerifyIntentionWatch("web", "dc1"), - "upstream:prepared_query:query": genVerifyPreparedQueryWatch("query", "dc1"), - fmt.Sprintf("discovery-chain:%s", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + rootsWatchID: genVerifyRootsWatch("dc1"), + leafWatchID: genVerifyLeafWatch("web", "dc1"), + intentionsWatchID: genVerifyIntentionWatch("web", "dc1"), + "upstream:" + pqUID.String(): genVerifyPreparedQueryWatch("query", "dc1"), + fmt.Sprintf("discovery-chain:%s", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -512,7 +517,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Mode: meshGatewayProxyConfigValue, }, }), - fmt.Sprintf("discovery-chain:%s-failover-remote?dc=dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + fmt.Sprintf("discovery-chain:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api-failover-remote", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -522,7 +527,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Mode: structs.MeshGatewayModeRemote, }, }), - fmt.Sprintf("discovery-chain:%s-failover-local?dc=dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + fmt.Sprintf("discovery-chain:%s-failover-local?dc=dc2", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api-failover-local", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -532,7 +537,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Mode: structs.MeshGatewayModeLocal, }, }), - fmt.Sprintf("discovery-chain:%s-failover-direct?dc=dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + fmt.Sprintf("discovery-chain:%s-failover-direct?dc=dc2", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api-failover-direct", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -542,7 +547,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Mode: structs.MeshGatewayModeNone, }, }), - fmt.Sprintf("discovery-chain:%s-dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + fmt.Sprintf("discovery-chain:%s-dc2", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api-dc2", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -566,7 +571,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Err: nil, }, { - CorrelationID: fmt.Sprintf("discovery-chain:%s", api.String()), + CorrelationID: fmt.Sprintf("discovery-chain:%s", apiUID.String()), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", func(req *discoverychain.CompileRequest) { @@ -576,7 +581,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Err: nil, }, { - CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-remote?dc=dc2", api.String()), + CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-remote?dc=dc2", apiUID.String()), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-remote", "default", "default", "dc2", "trustdomain.consul", func(req *discoverychain.CompileRequest) { @@ -586,7 +591,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Err: nil, }, { - CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-local?dc=dc2", api.String()), + CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-local?dc=dc2", apiUID.String()), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-local", "default", "default", "dc2", "trustdomain.consul", func(req *discoverychain.CompileRequest) { @@ -596,7 +601,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Err: nil, }, { - CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-direct?dc=dc2", api.String()), + CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-direct?dc=dc2", apiUID.String()), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-direct", "default", "default", "dc2", "trustdomain.consul", func(req *discoverychain.CompileRequest) { @@ -606,7 +611,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Err: nil, }, { - CorrelationID: fmt.Sprintf("discovery-chain:%s-dc2", api.String()), + CorrelationID: fmt.Sprintf("discovery-chain:%s-dc2", apiUID.String()), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "api-dc2", "default", "default", "dc1", "trustdomain.consul", func(req *discoverychain.CompileRequest) { @@ -645,12 +650,12 @@ func TestState_WatchesAndUpdates(t *testing.T) { stage1 := verificationStage{ requiredWatches: map[string]verifyWatchRequest{ - fmt.Sprintf("upstream-target:api.default.default.dc1:%s", api.String()): genVerifyServiceWatch("api", "", "dc1", true), - fmt.Sprintf("upstream-target:api-failover-remote.default.default.dc2:%s-failover-remote?dc=dc2", api.String()): genVerifyServiceWatch("api-failover-remote", "", "dc2", true), - fmt.Sprintf("upstream-target:api-failover-local.default.default.dc2:%s-failover-local?dc=dc2", api.String()): genVerifyServiceWatch("api-failover-local", "", "dc2", true), - fmt.Sprintf("upstream-target:api-failover-direct.default.default.dc2:%s-failover-direct?dc=dc2", api.String()): genVerifyServiceWatch("api-failover-direct", "", "dc2", true), - fmt.Sprintf("mesh-gateway:dc2:%s-failover-remote?dc=dc2", api.String()): genVerifyGatewayWatch("dc2"), - fmt.Sprintf("mesh-gateway:dc1:%s-failover-local?dc=dc2", api.String()): genVerifyGatewayWatch("dc1"), + fmt.Sprintf("upstream-target:api.default.default.dc1:%s", apiUID.String()): genVerifyServiceWatch("api", "", "dc1", true), + fmt.Sprintf("upstream-target:api-failover-remote.default.default.dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-remote", "", "dc2", true), + fmt.Sprintf("upstream-target:api-failover-local.default.default.dc2:%s-failover-local?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-local", "", "dc2", true), + fmt.Sprintf("upstream-target:api-failover-direct.default.default.dc2:%s-failover-direct?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-direct", "", "dc2", true), + fmt.Sprintf("mesh-gateway:dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc2"), + fmt.Sprintf("mesh-gateway:dc1:%s-failover-local?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc1"), }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.True(t, snap.Valid()) @@ -673,7 +678,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { } if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal { - stage1.requiredWatches[fmt.Sprintf("mesh-gateway:dc1:%s-dc2", api.String())] = genVerifyGatewayWatch("dc1") + stage1.requiredWatches[fmt.Sprintf("mesh-gateway:dc1:%s-dc2", apiUID.String())] = genVerifyGatewayWatch("dc1") } return testCase{ @@ -999,7 +1004,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }) require.Len(t, snap.IngressGateway.WatchedDiscoveryChains, 1) - require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, api.String()) + require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, apiUID) }, }, { @@ -1020,7 +1025,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { requiredWatches: map[string]verifyWatchRequest{ - "discovery-chain:" + api.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + "discovery-chain:" + apiUID.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -1030,7 +1035,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, events: []cache.UpdateEvent{ { - CorrelationID: "discovery-chain:" + api.String(), + CorrelationID: "discovery-chain:" + apiUID.String(), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", nil), }, @@ -1039,16 +1044,16 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.IngressGateway.WatchedUpstreams, 1) - require.Len(t, snap.IngressGateway.WatchedUpstreams[api.String()], 1) + require.Len(t, snap.IngressGateway.WatchedUpstreams[apiUID], 1) }, }, { requiredWatches: map[string]verifyWatchRequest{ - "upstream-target:api.default.default.dc1:" + api.String(): genVerifyServiceWatch("api", "", "dc1", true), + "upstream-target:api.default.default.dc1:" + apiUID.String(): genVerifyServiceWatch("api", "", "dc1", true), }, events: []cache.UpdateEvent{ { - CorrelationID: "upstream-target:api.default.default.dc1:" + api.String(), + CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: structs.CheckServiceNodes{ { @@ -1068,10 +1073,10 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.IngressGateway.WatchedUpstreamEndpoints, 1) - require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints, api.String()) - require.Len(t, snap.IngressGateway.WatchedUpstreamEndpoints[api.String()], 1) - require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints[api.String()], "api.default.default.dc1") - require.Equal(t, snap.IngressGateway.WatchedUpstreamEndpoints[api.String()]["api.default.default.dc1"], + require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints, apiUID) + require.Len(t, snap.IngressGateway.WatchedUpstreamEndpoints[apiUID], 1) + require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints[apiUID], "api.default.default.dc1") + require.Equal(t, snap.IngressGateway.WatchedUpstreamEndpoints[apiUID]["api.default.default.dc1"], structs.CheckServiceNodes{ { Node: &structs.Node{ @@ -1135,7 +1140,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.IngressGateway.Hosts, 1) require.Len(t, snap.IngressGateway.Upstreams, 1) require.Len(t, snap.IngressGateway.WatchedDiscoveryChains, 1) - require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, api.String()) + require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, apiUID) }, }, { @@ -1220,7 +1225,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.IngressGateway.Hosts, 1) require.Len(t, snap.IngressGateway.Upstreams, 1) require.Len(t, snap.IngressGateway.WatchedDiscoveryChains, 1) - require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, api.String()) + require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, apiUID) }, }, { @@ -1682,8 +1687,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.True(t, snap.TerminatingGateway.IsEmpty()) require.False(t, snap.ConnectProxy.IsEmpty()) - expectUpstreams := map[string]*structs.Upstream{ - db.String(): { + expectUpstreams := map[UpstreamID]*structs.Upstream{ + dbUID: { DestinationName: "db", DestinationNamespace: structs.IntentionDefaultNamespace, DestinationPartition: structs.IntentionDefaultNamespace, @@ -1771,7 +1776,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.ConnectProxy.UpstreamConfig, 1) wc := structs.NewServiceName(structs.WildcardSpecifier, structs.WildcardEnterpriseMetaInDefaultPartition()) - require.Contains(t, snap.ConnectProxy.UpstreamConfig, wc.String()) + wcUID := NewUpstreamIDFromServiceName(wc) + require.Contains(t, snap.ConnectProxy.UpstreamConfig, wcUID) }, }, // Valid snapshot after roots, leaf, and intentions @@ -1833,16 +1839,16 @@ func TestState_WatchesAndUpdates(t *testing.T) { verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.True(t, snap.Valid(), "should still be valid") - require.Equal(t, map[string]struct{}{db.String(): {}}, snap.ConnectProxy.IntentionUpstreams) + require.Equal(t, map[UpstreamID]struct{}{dbUID: {}}, snap.ConnectProxy.IntentionUpstreams) // Should start watch for db's chain - require.Contains(t, snap.ConnectProxy.WatchedDiscoveryChains, db.String()) + require.Contains(t, snap.ConnectProxy.WatchedDiscoveryChains, dbUID) // Should not have results yet require.Empty(t, snap.ConnectProxy.DiscoveryChain) require.Len(t, snap.ConnectProxy.UpstreamConfig, 2) - cfg, ok := snap.ConnectProxy.UpstreamConfig[db.String()] + cfg, ok := snap.ConnectProxy.UpstreamConfig[dbUID] require.True(t, ok) // Upstream config should have been inherited from defaults under wildcard key @@ -1852,7 +1858,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { // Discovery chain updates should be stored { requiredWatches: map[string]verifyWatchRequest{ - "discovery-chain:" + db.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + "discovery-chain:" + dbUID.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "db", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -1864,7 +1870,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, events: []cache.UpdateEvent{ { - CorrelationID: "discovery-chain:" + db.String(), + CorrelationID: "discovery-chain:" + dbUID.String(), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", nil), }, @@ -1873,16 +1879,16 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1) - require.Len(t, snap.ConnectProxy.WatchedUpstreams[db.String()], 1) + require.Len(t, snap.ConnectProxy.WatchedUpstreams[dbUID], 1) }, }, { requiredWatches: map[string]verifyWatchRequest{ - "upstream-target:db.default.default.dc1:" + db.String(): genVerifyServiceWatch("db", "", "dc1", true), + "upstream-target:db.default.default.dc1:" + dbUID.String(): genVerifyServiceWatch("db", "", "dc1", true), }, events: []cache.UpdateEvent{ { - CorrelationID: "upstream-target:db.default.default.dc1:" + db.String(), + CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: structs.CheckServiceNodes{ { @@ -1931,10 +1937,10 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1) - require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, db.String()) - require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[db.String()], 1) - require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[db.String()], "db.default.default.dc1") - require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[db.String()]["db.default.default.dc1"], + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, dbUID) + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID], 1) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID], "db.default.default.dc1") + require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"], structs.CheckServiceNodes{ { Node: &structs.Node{ @@ -1980,8 +1986,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { // The LAN service address is used below because transparent proxying // does not support querying service nodes in other DCs, and the WAN address // should not be used in DC-local calls. - require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[string]ServicePassthroughAddrs{ - db.String(): { + require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]ServicePassthroughAddrs{ + dbUID: { SNI: connect.ServiceSNI("db", "", structs.IntentionDefaultNamespace, "", snap.Datacenter, snap.Roots.TrustDomain), SpiffeID: connect.SpiffeIDService{ Host: snap.Roots.TrustDomain, @@ -2001,7 +2007,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { // Discovery chain updates should be stored { requiredWatches: map[string]verifyWatchRequest{ - "discovery-chain:" + db.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + "discovery-chain:" + dbUID.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "db", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -2013,7 +2019,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, events: []cache.UpdateEvent{ { - CorrelationID: "discovery-chain:" + db.String(), + CorrelationID: "discovery-chain:" + dbUID.String(), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", nil, &structs.ServiceResolverConfigEntry{ Kind: structs.ServiceResolver, @@ -2028,12 +2034,12 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1) - require.Len(t, snap.ConnectProxy.WatchedUpstreams[db.String()], 2) + require.Len(t, snap.ConnectProxy.WatchedUpstreams[dbUID], 2) // In transparent mode we watch the upstream's endpoints even if the upstream is not a target of its chain. // This will happen in cases like redirects. - require.Contains(t, snap.ConnectProxy.WatchedUpstreams[db.String()], "db.default.default.dc1") - require.Contains(t, snap.ConnectProxy.WatchedUpstreams[db.String()], "mysql.default.default.dc1") + require.Contains(t, snap.ConnectProxy.WatchedUpstreams[dbUID], "db.default.default.dc1") + require.Contains(t, snap.ConnectProxy.WatchedUpstreams[dbUID], "mysql.default.default.dc1") }, }, // Empty list of upstreams should clean everything up @@ -2110,7 +2116,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { leafWatchID: genVerifyLeafWatch("api", "dc1"), intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), - "discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + "discovery-chain:" + upstreamIDForDC2(dbUID).String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "db", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -2129,8 +2135,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.ConnectProxy.UpstreamConfig, 2) wc := structs.NewServiceName(structs.WildcardSpecifier, structs.WildcardEnterpriseMetaInDefaultPartition()) - require.Contains(t, snap.ConnectProxy.UpstreamConfig, wc.String()) - require.Contains(t, snap.ConnectProxy.UpstreamConfig, upstreamIDForDC2(db.String())) + wcUID := NewUpstreamIDFromServiceName(wc) + require.Contains(t, snap.ConnectProxy.UpstreamConfig, wcUID) + require.Contains(t, snap.ConnectProxy.UpstreamConfig, upstreamIDForDC2(dbUID)) }, }, // Valid snapshot after roots, leaf, and intentions @@ -2172,7 +2179,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { // Discovery chain updates should be stored { requiredWatches: map[string]verifyWatchRequest{ - "discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + "discovery-chain:" + upstreamIDForDC2(dbUID).String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "db", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -2183,7 +2190,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, events: []cache.UpdateEvent{ { - CorrelationID: "discovery-chain:" + upstreamIDForDC2(db.String()), + CorrelationID: "discovery-chain:" + upstreamIDForDC2(dbUID).String(), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc2", "trustdomain.consul", func(req *discoverychain.CompileRequest) { @@ -2195,9 +2202,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.ConnectProxy.WatchedGateways, 1) - require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(db.String())], 1) + require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(dbUID)], 1) require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1) - require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(db.String())], 1) + require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(dbUID)], 1) }, }, // Empty list of upstreams should only clean up implicit upstreams. The explicit upstream db should not @@ -2209,7 +2216,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { "api", "", "dc1", false), leafWatchID: genVerifyLeafWatch("api", "dc1"), intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), - "discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + "discovery-chain:" + upstreamIDForDC2(dbUID).String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "db", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -2238,11 +2245,11 @@ func TestState_WatchesAndUpdates(t *testing.T) { // Explicit upstreams should not be deleted when the empty update event happens since that is // for intention upstreams. require.Len(t, snap.ConnectProxy.DiscoveryChain, 1) - require.Contains(t, snap.ConnectProxy.DiscoveryChain, upstreamIDForDC2(db.String())) + require.Contains(t, snap.ConnectProxy.DiscoveryChain, upstreamIDForDC2(dbUID)) require.Len(t, snap.ConnectProxy.WatchedGateways, 1) - require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(db.String())], 1) + require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(dbUID)], 1) require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1) - require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(db.String())], 1) + require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(dbUID)], 1) }, }, }, diff --git a/agent/proxycfg/testing.go b/agent/proxycfg/testing.go index 1107623766..11730819cc 100644 --- a/agent/proxycfg/testing.go +++ b/agent/proxycfg/testing.go @@ -696,18 +696,18 @@ func TestConfigSnapshot(t testing.T) *ConfigSnapshot { ConnectProxy: configSnapshotConnectProxy{ ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{ Leaf: leaf, - UpstreamConfig: upstreams.ToMap(), - DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ - "db": dbChain, + UpstreamConfig: UpstreamsToMap(upstreams), + DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{ + UpstreamIDFromString("db"): dbChain, }, - WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{ - "db": { + WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{ + UpstreamIDFromString("db"): { "db.default.default.dc1": TestUpstreamNodes(t, "db"), }, }, }, - PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{ - "prepared_query:geo-cache": TestPreparedQueryNodes(t, "geo-cache"), + PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{ + UpstreamIDFromString("prepared_query:geo-cache"): TestPreparedQueryNodes(t, "geo-cache"), }, Intentions: nil, // no intentions defined IntentionsSet: true, @@ -822,8 +822,8 @@ func testConfigSnapshotDiscoveryChain(t testing.T, variation string, additionalE ConfigSnapshotUpstreams: setupTestVariationConfigEntriesAndSnapshot( t, variation, leaf, additionalEntries..., ), - PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{ - "prepared_query:geo-cache": TestPreparedQueryNodes(t, "geo-cache"), + PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{ + UpstreamIDFromString("prepared_query:geo-cache"): TestPreparedQueryNodes(t, "geo-cache"), }, Intentions: nil, // no intentions defined IntentionsSet: true, @@ -1402,18 +1402,20 @@ func setupTestVariationConfigEntriesAndSnapshot( dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", compileSetup, entries...) + dbUID := UpstreamIDFromString("db") + upstreams := structs.TestUpstreams(t) snap := ConfigSnapshotUpstreams{ Leaf: leaf, - DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ - "db": dbChain, + DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{ + dbUID: dbChain, }, - WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{ - "db": { + WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: { "db.default.default.dc1": TestUpstreamNodes(t, "db"), }, }, - UpstreamConfig: upstreams.ToMap(), + UpstreamConfig: UpstreamsToMap(upstreams), } switch variation { @@ -1422,61 +1424,61 @@ func setupTestVariationConfigEntriesAndSnapshot( case "simple": case "external-sni": case "failover": - snap.WatchedUpstreamEndpoints["db"]["fail.default.default.dc1"] = + snap.WatchedUpstreamEndpoints[dbUID]["fail.default.default.dc1"] = TestUpstreamNodesAlternate(t) case "failover-through-remote-gateway-triggered": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc1"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"] = TestUpstreamNodesInStatus(t, "critical") fallthrough case "failover-through-remote-gateway": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc2"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc2"] = TestUpstreamNodesDC2(t) - snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{ - "db": { + snap.WatchedGatewayEndpoints = map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: { "dc2": TestGatewayNodesDC2(t), }, } case "failover-through-double-remote-gateway-triggered": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc1"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"] = TestUpstreamNodesInStatus(t, "critical") - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc2"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc2"] = TestUpstreamNodesInStatusDC2(t, "critical") fallthrough case "failover-through-double-remote-gateway": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc3"] = TestUpstreamNodesDC2(t) - snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{ - "db": { + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc3"] = TestUpstreamNodesDC2(t) + snap.WatchedGatewayEndpoints = map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: { "dc2": TestGatewayNodesDC2(t), "dc3": TestGatewayNodesDC3(t), }, } case "failover-through-local-gateway-triggered": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc1"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"] = TestUpstreamNodesInStatus(t, "critical") fallthrough case "failover-through-local-gateway": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc2"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc2"] = TestUpstreamNodesDC2(t) - snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{ - "db": { + snap.WatchedGatewayEndpoints = map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: { "dc1": TestGatewayNodesDC1(t), }, } case "failover-through-double-local-gateway-triggered": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc1"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"] = TestUpstreamNodesInStatus(t, "critical") - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc2"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc2"] = TestUpstreamNodesInStatusDC2(t, "critical") fallthrough case "failover-through-double-local-gateway": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc3"] = TestUpstreamNodesDC2(t) - snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{ - "db": { + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc3"] = TestUpstreamNodesDC2(t) + snap.WatchedGatewayEndpoints = map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: { "dc1": TestGatewayNodesDC1(t), }, } case "splitter-with-resolver-redirect-multidc": - snap.WatchedUpstreamEndpoints["db"] = map[string]structs.CheckServiceNodes{ + snap.WatchedUpstreamEndpoints[dbUID] = map[string]structs.CheckServiceNodes{ "v1.db.default.default.dc1": TestUpstreamNodes(t, "db"), "v2.db.default.default.dc2": TestUpstreamNodesDC2(t), } @@ -1484,10 +1486,10 @@ func setupTestVariationConfigEntriesAndSnapshot( case "grpc-router": case "chain-and-router": case "http-multiple-services": - snap.WatchedUpstreamEndpoints["foo"] = map[string]structs.CheckServiceNodes{ + snap.WatchedUpstreamEndpoints[UpstreamIDFromString("foo")] = map[string]structs.CheckServiceNodes{ "foo.default.default.dc1": TestUpstreamNodes(t, "foo"), } - snap.WatchedUpstreamEndpoints["bar"] = map[string]structs.CheckServiceNodes{ + snap.WatchedUpstreamEndpoints[UpstreamIDFromString("bar")] = map[string]structs.CheckServiceNodes{ "bar.default.default.dc1": TestUpstreamNodesAlternate(t), } case "lb-resolver": @@ -2147,9 +2149,9 @@ func TestConfigSnapshotIngress_MultipleListenersDuplicateService(t testing.T) *C fooChain := discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil) barChain := discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil) - snap.IngressGateway.DiscoveryChain = map[string]*structs.CompiledDiscoveryChain{ - "foo": fooChain, - "bar": barChain, + snap.IngressGateway.DiscoveryChain = map[UpstreamID]*structs.CompiledDiscoveryChain{ + UpstreamIDFromString("foo"): fooChain, + UpstreamIDFromString("bar"): barChain, } return snap diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index a25cb62f62..c923546c78 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -42,34 +42,35 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up if !ok { return fmt.Errorf("invalid type for response: %T", u.Result) } - svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:") + uidString := strings.TrimPrefix(u.CorrelationID, "discovery-chain:") + uid := UpstreamIDFromString(uidString) switch snap.Kind { case structs.ServiceKindIngressGateway: - if _, ok := snap.IngressGateway.UpstreamsSet[svc]; !ok { + if _, ok := snap.IngressGateway.UpstreamsSet[uid]; !ok { // Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped. // The associated watch was likely cancelled. - delete(upstreamsSnapshot.DiscoveryChain, svc) - s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", svc) + delete(upstreamsSnapshot.DiscoveryChain, uid) + s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", uid) return nil } case structs.ServiceKindConnectProxy: - explicit := snap.ConnectProxy.UpstreamConfig[svc].HasLocalPortOrSocket() - if _, implicit := snap.ConnectProxy.IntentionUpstreams[svc]; !implicit && !explicit { + explicit := snap.ConnectProxy.UpstreamConfig[uid].HasLocalPortOrSocket() + if _, implicit := snap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit { // Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped. // The associated watch was likely cancelled. - delete(upstreamsSnapshot.DiscoveryChain, svc) - s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", svc) + delete(upstreamsSnapshot.DiscoveryChain, uid) + s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", uid) return nil } default: return fmt.Errorf("discovery-chain watch fired for unsupported kind: %s", snap.Kind) } - upstreamsSnapshot.DiscoveryChain[svc] = resp.Chain + upstreamsSnapshot.DiscoveryChain[uid] = resp.Chain - if err := s.resetWatchesFromChain(ctx, svc, resp.Chain, upstreamsSnapshot); err != nil { + if err := s.resetWatchesFromChain(ctx, uid, resp.Chain, upstreamsSnapshot); err != nil { return err } @@ -79,15 +80,17 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up return fmt.Errorf("invalid type for response: %T", u.Result) } correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:") - targetID, svc, ok := removeColonPrefix(correlationID) + targetID, uidString, ok := removeColonPrefix(correlationID) if !ok { return fmt.Errorf("invalid correlation id %q", u.CorrelationID) } - if _, ok := upstreamsSnapshot.WatchedUpstreamEndpoints[svc]; !ok { - upstreamsSnapshot.WatchedUpstreamEndpoints[svc] = make(map[string]structs.CheckServiceNodes) + uid := UpstreamIDFromString(uidString) + + if _, ok := upstreamsSnapshot.WatchedUpstreamEndpoints[uid]; !ok { + upstreamsSnapshot.WatchedUpstreamEndpoints[uid] = make(map[string]structs.CheckServiceNodes) } - upstreamsSnapshot.WatchedUpstreamEndpoints[svc][targetID] = resp.Nodes + upstreamsSnapshot.WatchedUpstreamEndpoints[uid][targetID] = resp.Nodes var passthroughAddrs map[string]ServicePassthroughAddrs @@ -121,8 +124,9 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up Service: svc.Name, } - if _, ok := upstreamsSnapshot.PassthroughUpstreams[svc.String()]; !ok { - upstreamsSnapshot.PassthroughUpstreams[svc.String()] = ServicePassthroughAddrs{ + svcUID := NewUpstreamIDFromServiceName(svc) + if _, ok := upstreamsSnapshot.PassthroughUpstreams[svcUID]; !ok { + upstreamsSnapshot.PassthroughUpstreams[svcUID] = ServicePassthroughAddrs{ SNI: sni, SpiffeID: spiffeID, @@ -136,7 +140,7 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up isRemote := !structs.EqualPartitions(svc.PartitionOrDefault(), s.proxyID.PartitionOrDefault()) addr, _ := node.BestAddress(isRemote) - upstreamsSnapshot.PassthroughUpstreams[svc.String()].Addrs[addr] = struct{}{} + upstreamsSnapshot.PassthroughUpstreams[NewUpstreamIDFromServiceName(svc)].Addrs[addr] = struct{}{} } } @@ -146,14 +150,16 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up return fmt.Errorf("invalid type for response: %T", u.Result) } correlationID := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:") - key, svc, ok := removeColonPrefix(correlationID) + key, uidString, ok := removeColonPrefix(correlationID) if !ok { return fmt.Errorf("invalid correlation id %q", u.CorrelationID) } - if _, ok = upstreamsSnapshot.WatchedGatewayEndpoints[svc]; !ok { - upstreamsSnapshot.WatchedGatewayEndpoints[svc] = make(map[string]structs.CheckServiceNodes) + uid := UpstreamIDFromString(uidString) + + if _, ok = upstreamsSnapshot.WatchedGatewayEndpoints[uid]; !ok { + upstreamsSnapshot.WatchedGatewayEndpoints[uid] = make(map[string]structs.CheckServiceNodes) } - upstreamsSnapshot.WatchedGatewayEndpoints[svc][key] = resp.Nodes + upstreamsSnapshot.WatchedGatewayEndpoints[uid][key] = resp.Nodes default: return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID) @@ -171,27 +177,27 @@ func removeColonPrefix(s string) (string, string, bool) { func (s *handlerUpstreams) resetWatchesFromChain( ctx context.Context, - id string, + uid UpstreamID, chain *structs.CompiledDiscoveryChain, snap *ConfigSnapshotUpstreams, ) error { - s.logger.Trace("resetting watches for discovery chain", "id", id) + s.logger.Trace("resetting watches for discovery chain", "id", uid) if chain == nil { return fmt.Errorf("not possible to arrive here with no discovery chain") } // Initialize relevant sub maps. - if _, ok := snap.WatchedUpstreams[id]; !ok { - snap.WatchedUpstreams[id] = make(map[string]context.CancelFunc) + if _, ok := snap.WatchedUpstreams[uid]; !ok { + snap.WatchedUpstreams[uid] = make(map[string]context.CancelFunc) } - if _, ok := snap.WatchedUpstreamEndpoints[id]; !ok { - snap.WatchedUpstreamEndpoints[id] = make(map[string]structs.CheckServiceNodes) + if _, ok := snap.WatchedUpstreamEndpoints[uid]; !ok { + snap.WatchedUpstreamEndpoints[uid] = make(map[string]structs.CheckServiceNodes) } - if _, ok := snap.WatchedGateways[id]; !ok { - snap.WatchedGateways[id] = make(map[string]context.CancelFunc) + if _, ok := snap.WatchedGateways[uid]; !ok { + snap.WatchedGateways[uid] = make(map[string]context.CancelFunc) } - if _, ok := snap.WatchedGatewayEndpoints[id]; !ok { - snap.WatchedGatewayEndpoints[id] = make(map[string]structs.CheckServiceNodes) + if _, ok := snap.WatchedGatewayEndpoints[uid]; !ok { + snap.WatchedGatewayEndpoints[uid] = make(map[string]structs.CheckServiceNodes) } // We could invalidate this selectively based on a hash of the relevant @@ -199,14 +205,14 @@ func (s *handlerUpstreams) resetWatchesFromChain( // upstream when the chain changes in any way. // // TODO(rb): content hash based add/remove - for targetID, cancelFn := range snap.WatchedUpstreams[id] { + for targetID, cancelFn := range snap.WatchedUpstreams[uid] { s.logger.Trace("stopping watch of target", - "upstream", id, + "upstream", uid, "chain", chain.ServiceName, "target", targetID, ) - delete(snap.WatchedUpstreams[id], targetID) - delete(snap.WatchedUpstreamEndpoints[id], targetID) + delete(snap.WatchedUpstreams[uid], targetID) + delete(snap.WatchedUpstreamEndpoints[uid], targetID) cancelFn() } @@ -222,7 +228,7 @@ func (s *handlerUpstreams) resetWatchesFromChain( } opts := targetWatchOpts{ - upstreamID: id, + upstreamID: uid, chainID: target.ID, service: target.Service, filter: target.Subset.Filter, @@ -231,7 +237,7 @@ func (s *handlerUpstreams) resetWatchesFromChain( } err := s.watchUpstreamTarget(ctx, snap, opts) if err != nil { - return fmt.Errorf("failed to watch target %q for upstream %q", target.ID, id) + return fmt.Errorf("failed to watch target %q for upstream %q", target.ID, uid) } // We'll get endpoints from the gateway query, but the health still has @@ -267,7 +273,7 @@ func (s *handlerUpstreams) resetWatchesFromChain( chainEntMeta := structs.NewEnterpriseMetaWithPartition(chain.Partition, chain.Namespace) opts := targetWatchOpts{ - upstreamID: id, + upstreamID: uid, chainID: chainID, service: chain.ServiceName, filter: "", @@ -276,18 +282,18 @@ func (s *handlerUpstreams) resetWatchesFromChain( } err := s.watchUpstreamTarget(ctx, snap, opts) if err != nil { - return fmt.Errorf("failed to watch target %q for upstream %q", chainID, id) + return fmt.Errorf("failed to watch target %q for upstream %q", chainID, uid) } } for key := range needGateways { - if _, ok := snap.WatchedGateways[id][key]; ok { + if _, ok := snap.WatchedGateways[uid][key]; ok { continue } gwKey := gatewayKeyFromString(key) s.logger.Trace("initializing watch of mesh gateway", - "upstream", id, + "upstream", uid, "chain", chain.ServiceName, "datacenter", gwKey.Datacenter, "partition", gwKey.Partition, @@ -300,7 +306,7 @@ func (s *handlerUpstreams) resetWatchesFromChain( source: *s.source, token: s.token, key: gwKey, - upstreamID: id, + upstreamID: uid, } err := watchMeshGateway(ctx, opts) if err != nil { @@ -308,23 +314,23 @@ func (s *handlerUpstreams) resetWatchesFromChain( return err } - snap.WatchedGateways[id][key] = cancel + snap.WatchedGateways[uid][key] = cancel } - for key, cancelFn := range snap.WatchedGateways[id] { + for key, cancelFn := range snap.WatchedGateways[uid] { if _, ok := needGateways[key]; ok { continue } gwKey := gatewayKeyFromString(key) s.logger.Trace("stopping watch of mesh gateway", - "upstream", id, + "upstream", uid, "chain", chain.ServiceName, "datacenter", gwKey.Datacenter, "partition", gwKey.Partition, ) - delete(snap.WatchedGateways[id], key) - delete(snap.WatchedGatewayEndpoints[id], key) + delete(snap.WatchedGateways[uid], key) + delete(snap.WatchedGatewayEndpoints[uid], key) cancelFn() } @@ -332,7 +338,7 @@ func (s *handlerUpstreams) resetWatchesFromChain( } type targetWatchOpts struct { - upstreamID string + upstreamID UpstreamID chainID string service string filter string @@ -350,7 +356,7 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config var finalMeta structs.EnterpriseMeta finalMeta.Merge(opts.entMeta) - correlationID := "upstream-target:" + opts.chainID + ":" + opts.upstreamID + correlationID := "upstream-target:" + opts.chainID + ":" + opts.upstreamID.String() ctx, cancel := context.WithCancel(ctx) err := s.health.Notify(ctx, structs.ServiceSpecificRequest{ @@ -378,7 +384,7 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config } type discoveryChainWatchOpts struct { - id string + id UpstreamID name string namespace string partition string @@ -403,7 +409,7 @@ func (s *handlerUpstreams) watchDiscoveryChain(ctx context.Context, snap *Config OverrideProtocol: opts.cfg.Protocol, OverrideConnectTimeout: opts.cfg.ConnectTimeout(), OverrideMeshGateway: opts.meshGateway, - }, "discovery-chain:"+opts.id, s.ch) + }, "discovery-chain:"+opts.id.String(), s.ch) if err != nil { cancel() return err diff --git a/agent/structs/connect_proxy_config.go b/agent/structs/connect_proxy_config.go index ba0696794c..f59543bdf0 100644 --- a/agent/structs/connect_proxy_config.go +++ b/agent/structs/connect_proxy_config.go @@ -314,15 +314,6 @@ func (us Upstreams) ToAPI() []api.Upstream { return a } -func (us Upstreams) ToMap() map[string]*Upstream { - upstreamMap := make(map[string]*Upstream) - - for i := range us { - upstreamMap[us[i].Identifier()] = &us[i] - } - return upstreamMap -} - // UpstreamsFromAPI is a helper for converting api.Upstream to Upstream. func UpstreamsFromAPI(us []api.Upstream) Upstreams { a := make([]Upstream, len(us)) @@ -551,24 +542,17 @@ func (k UpstreamKey) String() string { ) } -// String implements Stringer by returning the Identifier. -func (u *Upstream) String() string { - return u.Identifier() -} - -// Identifier returns a string representation that uniquely identifies the -// upstream in a canonical but human readable way. -func (us *Upstream) Identifier() string { - name := us.enterpriseIdentifierPrefix() + us.DestinationName +// String returns a representation of this upstream suitable for debugging +// purposes but nothing relies upon this format. +func (us *Upstream) String() string { + name := us.enterpriseStringPrefix() + us.DestinationName typ := us.DestinationType if us.Datacenter != "" { name += "?dc=" + us.Datacenter } - // Service is default type so never prefix it. This is more readable and long - // term it is the only type that matters so we can drop the prefix and have - // nicer naming in metrics etc. + // Service is default type so never prefix it. if typ == "" || typ == UpstreamDestTypeService { return name } diff --git a/agent/structs/connect_proxy_config_oss.go b/agent/structs/connect_proxy_config_oss.go index 61bd2e3936..dff9cc25c9 100644 --- a/agent/structs/connect_proxy_config_oss.go +++ b/agent/structs/connect_proxy_config_oss.go @@ -13,6 +13,6 @@ func (us *Upstream) DestinationID() ServiceID { } } -func (us *Upstream) enterpriseIdentifierPrefix() string { +func (us *Upstream) enterpriseStringPrefix() string { return "" } diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index d534e9e9a4..4d33bf3779 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -77,22 +77,22 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C clusters = append(clusters, passthroughs...) } - for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain { - upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[id] + for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { + upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid] explicit := upstreamCfg.HasLocalPortOrSocket() - if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit { + if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit { // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. continue } - chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id] + chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid] if !ok { // this should not happen - return nil, fmt.Errorf("no endpoint map for upstream %q", id) + return nil, fmt.Errorf("no endpoint map for upstream %q", uid) } - upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(id, upstreamCfg, chain, chainEndpoints, cfgSnap) + upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(uid, upstreamCfg, chain, chainEndpoints, cfgSnap) if err != nil { return nil, err } @@ -387,30 +387,30 @@ func (s *ResourceGenerator) injectGatewayServiceAddons(cfgSnap *proxycfg.ConfigS func (s *ResourceGenerator) clustersFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { var clusters []proto.Message - createdClusters := make(map[string]bool) + createdClusters := make(map[proxycfg.UpstreamID]bool) for _, upstreams := range cfgSnap.IngressGateway.Upstreams { for _, u := range upstreams { - id := u.Identifier() + uid := proxycfg.NewUpstreamID(&u) // If we've already created a cluster for this upstream, skip it. Multiple listeners may // reference the same upstream, so we don't need to create duplicate clusters in that case. - if createdClusters[id] { + if createdClusters[uid] { continue } - chain, ok := cfgSnap.IngressGateway.DiscoveryChain[id] + chain, ok := cfgSnap.IngressGateway.DiscoveryChain[uid] if !ok { // this should not happen - return nil, fmt.Errorf("no discovery chain for upstream %q", id) + return nil, fmt.Errorf("no discovery chain for upstream %q", uid) } - chainEndpoints, ok := cfgSnap.IngressGateway.WatchedUpstreamEndpoints[id] + chainEndpoints, ok := cfgSnap.IngressGateway.WatchedUpstreamEndpoints[uid] if !ok { // this should not happen - return nil, fmt.Errorf("no endpoint map for upstream %q", id) + return nil, fmt.Errorf("no endpoint map for upstream %q", uid) } - upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(id, &u, chain, chainEndpoints, cfgSnap) + upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(uid, &u, chain, chainEndpoints, cfgSnap) if err != nil { return nil, err } @@ -418,7 +418,7 @@ func (s *ResourceGenerator) clustersFromSnapshotIngressGateway(cfgSnap *proxycfg for _, c := range upstreamClusters { clusters = append(clusters, c) } - createdClusters[id] = true + createdClusters[uid] = true } } return clusters, nil @@ -481,6 +481,8 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs var c *envoy_cluster_v3.Cluster var err error + uid := proxycfg.NewUpstreamID(&upstream) + dc := upstream.Datacenter if dc == "" { dc = cfgSnap.Datacenter @@ -491,7 +493,7 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. - s.Logger.Warn("failed to parse", "upstream", upstream.Identifier(), "error", err) + s.Logger.Warn("failed to parse", "upstream", uid, "error", err) } if cfg.EnvoyClusterJSON != "" { c, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON) @@ -524,7 +526,7 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs } } - endpoints := cfgSnap.ConnectProxy.PreparedQueryEndpoints[upstream.Identifier()] + endpoints := cfgSnap.ConnectProxy.PreparedQueryEndpoints[uid] var ( spiffeIDs = make([]connect.SpiffeIDService, 0) seen = make(map[string]struct{}) @@ -571,14 +573,14 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs } func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain( - id string, + uid proxycfg.UpstreamID, upstream *structs.Upstream, chain *structs.CompiledDiscoveryChain, chainEndpoints map[string]structs.CheckServiceNodes, cfgSnap *proxycfg.ConfigSnapshot, ) ([]*envoy_cluster_v3.Cluster, error) { if chain == nil { - return nil, fmt.Errorf("cannot create upstream cluster without discovery chain for %s", id) + return nil, fmt.Errorf("cannot create upstream cluster without discovery chain for %s", uid) } configMap := make(map[string]interface{}) @@ -589,7 +591,7 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain( if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. - s.Logger.Warn("failed to parse", "upstream", id, + s.Logger.Warn("failed to parse", "upstream", uid, "error", err) } @@ -604,7 +606,7 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain( } } else { s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configured for", - "discovery chain", chain.ServiceName, "upstream", id, + "discovery chain", chain.ServiceName, "upstream", uid, "envoy_cluster_json", chain.ServiceName) } } diff --git a/agent/xds/clusters_test.go b/agent/xds/clusters_test.go index 6553a22d36..971407efcb 100644 --- a/agent/xds/clusters_test.go +++ b/agent/xds/clusters_test.go @@ -69,8 +69,8 @@ func TestClustersFromSnapshot(t *testing.T) { customAppClusterJSON(t, customClusterJSONOptions{ Name: "myservice", }) - snap.ConnectProxy.UpstreamConfig = map[string]*structs.Upstream{ - "db": { + snap.ConnectProxy.UpstreamConfig = map[proxycfg.UpstreamID]*structs.Upstream{ + UID("db"): { // The local bind port is overridden by the escape hatch, but is required for explicit upstreams. LocalBindPort: 9191, Config: map[string]interface{}{ @@ -669,15 +669,17 @@ func TestClustersFromSnapshot(t *testing.T) { snap.Proxy.Mode = structs.ProxyModeTransparent kafka := structs.NewServiceName("kafka", nil) mongo := structs.NewServiceName("mongo", nil) + kafkaUID := proxycfg.NewUpstreamIDFromServiceName(kafka) + mongoUID := proxycfg.NewUpstreamIDFromServiceName(mongo) - snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{ - kafka.String(): {}, - mongo.String(): {}, + snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{ + kafkaUID: {}, + mongoUID: {}, } // We add a passthrough cluster for each upstream service name - snap.ConnectProxy.PassthroughUpstreams = map[string]proxycfg.ServicePassthroughAddrs{ - kafka.String(): { + snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]proxycfg.ServicePassthroughAddrs{ + kafkaUID: { SNI: "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul", SpiffeID: connect.SpiffeIDService{ Host: "e5b08d03-bfc3-c870-1833-baddb116e648.consul", @@ -689,7 +691,7 @@ func TestClustersFromSnapshot(t *testing.T) { "9.9.9.9": {}, }, }, - mongo.String(): { + mongoUID: { SNI: "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul", SpiffeID: connect.SpiffeIDService{ Host: "e5b08d03-bfc3-c870-1833-baddb116e648.consul", @@ -705,8 +707,8 @@ func TestClustersFromSnapshot(t *testing.T) { } // There should still be a cluster for non-passthrough requests - snap.ConnectProxy.DiscoveryChain[mongo.String()] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil) - snap.ConnectProxy.WatchedUpstreamEndpoints[mongo.String()] = map[string]structs.CheckServiceNodes{ + snap.ConnectProxy.DiscoveryChain[mongoUID] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.WatchedUpstreamEndpoints[mongoUID] = map[string]structs.CheckServiceNodes{ "mongo.default.default.dc1": { structs.CheckServiceNode{ Node: &structs.Node{ @@ -923,3 +925,8 @@ func TestEnvoyLBConfig_InjectToCluster(t *testing.T) { }) } } + +// UID is just a convenience function to aid in writing tests less verbosely. +func UID(input string) proxycfg.UpstreamID { + return proxycfg.UpstreamIDFromString(input) +} diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 99a1f0393b..d5b1a575ba 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -153,9 +153,9 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) - deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, svc, targetID string) { - snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[svc][targetID] = - snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[svc][targetID][0:1] + deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID, targetID string) { + snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID] = + snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID][0:1] } runStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) { @@ -169,7 +169,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { // now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for DB snap = newTestSnapshot(t, snap, "") - deleteAllButOneEndpoint(snap, "db", "db.default.default.dc1") + deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1") mgr.DeliverConfig(t, sid, snap) // We never send an EDS reply about this change. @@ -206,7 +206,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { runStep(t, "simulate envoy NACKing an endpoint update", func(t *testing.T) { // Trigger only an EDS update. snap = newTestSnapshot(t, snap, "") - deleteAllButOneEndpoint(snap, "db", "db.default.default.dc1") + deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1") mgr.DeliverConfig(t, sid, snap) // Send envoy an EDS update. diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index b93b6da760..4742ea5264 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -47,22 +47,22 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. resources := make([]proto.Message, 0, len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints)) - for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain { - upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[id] + for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { + upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid] explicit := upstreamCfg.HasLocalPortOrSocket() - if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit { + if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit { // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. continue } es := s.endpointsFromDiscoveryChain( - id, + uid, chain, cfgSnap.Locality, upstreamCfg, - cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id], - cfgSnap.ConnectProxy.WatchedGatewayEndpoints[id], + cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid], + cfgSnap.ConnectProxy.WatchedGatewayEndpoints[uid], ) resources = append(resources, es...) } @@ -72,7 +72,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. if u.DestinationType != structs.UpstreamDestTypePreparedQuery { continue } - id := u.Identifier() + uid := proxycfg.NewUpstreamID(&u) dc := u.Datacenter if dc == "" { @@ -80,7 +80,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. } clusterName := connect.UpstreamSNI(&u, "", dc, cfgSnap.Roots.TrustDomain) - endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[id] + endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[uid] if ok { la := makeLoadAssignment( clusterName, @@ -318,27 +318,27 @@ func (s *ResourceGenerator) endpointsFromServicesAndResolvers( func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { var resources []proto.Message - createdClusters := make(map[string]bool) + createdClusters := make(map[proxycfg.UpstreamID]bool) for _, upstreams := range cfgSnap.IngressGateway.Upstreams { for _, u := range upstreams { - id := u.Identifier() + uid := proxycfg.NewUpstreamID(&u) // If we've already created endpoints for this upstream, skip it. Multiple listeners may // reference the same upstream, so we don't need to create duplicate endpoints in that case. - if createdClusters[id] { + if createdClusters[uid] { continue } es := s.endpointsFromDiscoveryChain( - id, - cfgSnap.IngressGateway.DiscoveryChain[id], + uid, + cfgSnap.IngressGateway.DiscoveryChain[uid], proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: u.DestinationPartition}, &u, - cfgSnap.IngressGateway.WatchedUpstreamEndpoints[id], - cfgSnap.IngressGateway.WatchedGatewayEndpoints[id], + cfgSnap.IngressGateway.WatchedUpstreamEndpoints[uid], + cfgSnap.IngressGateway.WatchedGatewayEndpoints[uid], ) resources = append(resources, es...) - createdClusters[id] = true + createdClusters[uid] = true } } return resources, nil @@ -366,7 +366,7 @@ func makePipeEndpoint(path string) *envoy_endpoint_v3.LbEndpoint { } func (s *ResourceGenerator) endpointsFromDiscoveryChain( - id string, + uid proxycfg.UpstreamID, chain *structs.CompiledDiscoveryChain, gatewayKey proxycfg.GatewayKey, upstream *structs.Upstream, @@ -387,7 +387,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain( if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. - s.Logger.Warn("failed to parse", "upstream", id, + s.Logger.Warn("failed to parse", "upstream", uid, "error", err) } @@ -402,7 +402,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain( } } else { s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configued for", - "discovery chain", chain.ServiceName, "upstream", id, + "discovery chain", chain.ServiceName, "upstream", uid, "envoy_cluster_json", chain.ServiceName) } } diff --git a/agent/xds/endpoints_test.go b/agent/xds/endpoints_test.go index 1aedb21a9c..83886a3714 100644 --- a/agent/xds/endpoints_test.go +++ b/agent/xds/endpoints_test.go @@ -325,8 +325,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { customAppClusterJSON(t, customClusterJSONOptions{ Name: "myservice", }) - snap.ConnectProxy.UpstreamConfig = map[string]*structs.Upstream{ - "db": { + snap.ConnectProxy.UpstreamConfig = map[proxycfg.UpstreamID]*structs.Upstream{ + UID("db"): { // The local bind port is overridden by the escape hatch, but is required for explicit upstreams. LocalBindPort: 9191, Config: map[string]interface{}{ diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index 319161ff47..fcc3fd2ad9 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -93,16 +93,16 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. } } - for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain { - upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[id] + for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { + upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid] explicit := upstreamCfg.HasLocalPortOrSocket() - if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit { + if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit { // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. continue } - cfg := s.getAndModifyUpstreamConfigForListener(id, upstreamCfg, chain) + cfg := s.getAndModifyUpstreamConfigForListener(uid, upstreamCfg, chain) // If escape hatch is present, create a listener from it and move on to the next if cfg.EnvoyListenerJSON != "" { @@ -133,7 +133,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. // Generate the upstream listeners for when they are explicitly set with a local bind port or socket path if upstreamCfg != nil && upstreamCfg.HasLocalPortOrSocket() { filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{ - routeName: id, + routeName: uid.EnvoyID(), clusterName: clusterName, filterName: filterName, protocol: cfg.Protocol, @@ -143,7 +143,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. return nil, err } - upstreamListener := makeListener(id, upstreamCfg, envoy_core_v3.TrafficDirection_OUTBOUND) + upstreamListener := makeListener(uid.EnvoyID(), upstreamCfg, envoy_core_v3.TrafficDirection_OUTBOUND) upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{ filterChain, } @@ -158,7 +158,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. // as we do for explicit upstreams above. filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{ - routeName: id, + routeName: uid.EnvoyID(), clusterName: clusterName, filterName: filterName, protocol: cfg.Protocol, @@ -168,7 +168,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. return nil, err } - endpoints := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id][chain.ID()] + endpoints := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid][chain.ID()] uniqueAddrs := make(map[string]struct{}) // Match on the virtual IP for the upstream service (identified by the chain's ID). @@ -199,7 +199,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. } if len(uniqueAddrs) > 2 { s.Logger.Debug("detected multiple virtual IPs for an upstream, all will be used to match traffic", - "upstream", id, "ip_count", len(uniqueAddrs)) + "upstream", uid, "ip_count", len(uniqueAddrs)) } // For every potential address we collected, create the appropriate address prefix to match on. @@ -218,12 +218,11 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. // as opposed to via a virtual IP. var passthroughChains []*envoy_listener_v3.FilterChain - for svc, passthrough := range cfgSnap.ConnectProxy.PassthroughUpstreams { - sn := structs.ServiceNameFromString(svc) + for uid, passthrough := range cfgSnap.ConnectProxy.PassthroughUpstreams { u := structs.Upstream{ - DestinationName: sn.Name, - DestinationNamespace: sn.NamespaceOrDefault(), - DestinationPartition: sn.PartitionOrDefault(), + DestinationName: uid.Name, + DestinationNamespace: uid.NamespaceOrDefault(), + DestinationPartition: uid.PartitionOrDefault(), } filterName := fmt.Sprintf("%s.%s.%s.%s", u.DestinationName, u.DestinationNamespace, u.DestinationPartition, cfgSnap.Datacenter) @@ -271,7 +270,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. } // Looping over explicit upstreams is only needed for prepared queries because they do not have discovery chains - for id, u := range cfgSnap.ConnectProxy.UpstreamConfig { + for uid, u := range cfgSnap.ConnectProxy.UpstreamConfig { if u.DestinationType != structs.UpstreamDestTypePreparedQuery { continue } @@ -280,7 +279,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. - s.Logger.Warn("failed to parse", "upstream", u.Identifier(), "error", err) + s.Logger.Warn("failed to parse", "upstream", uid, "error", err) } // If escape hatch is present, create a listener from it and move on to the next @@ -288,7 +287,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. upstreamListener, err := makeListenerFromUserConfig(cfg.EnvoyListenerJSON) if err != nil { s.Logger.Error("failed to parse envoy_listener_json", - "upstream", u.Identifier(), + "upstream", uid, "error", err) continue } @@ -296,13 +295,13 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. continue } - upstreamListener := makeListener(id, u, envoy_core_v3.TrafficDirection_OUTBOUND) + upstreamListener := makeListener(uid.EnvoyID(), u, envoy_core_v3.TrafficDirection_OUTBOUND) filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{ // TODO (SNI partition) add partition for upstream SNI clusterName: connect.UpstreamSNI(u, "", cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain), - filterName: id, - routeName: id, + filterName: uid.EnvoyID(), + routeName: uid.EnvoyID(), protocol: cfg.Protocol, }) if err != nil { @@ -1289,7 +1288,11 @@ func simpleChainTarget(chain *structs.CompiledDiscoveryChain) (*structs.Discover return chain.Targets[targetID], nil } -func (s *ResourceGenerator) getAndModifyUpstreamConfigForListener(id string, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) structs.UpstreamConfig { +func (s *ResourceGenerator) getAndModifyUpstreamConfigForListener( + uid proxycfg.UpstreamID, + u *structs.Upstream, + chain *structs.CompiledDiscoveryChain, +) structs.UpstreamConfig { var ( cfg structs.UpstreamConfig err error @@ -1304,7 +1307,7 @@ func (s *ResourceGenerator) getAndModifyUpstreamConfigForListener(id string, u * if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. - s.Logger.Warn("failed to parse", "upstream", id, "error", err) + s.Logger.Warn("failed to parse", "upstream", uid, "error", err) } } else { // Use NoDefaults here so that we can set the protocol to the chain @@ -1313,12 +1316,12 @@ func (s *ResourceGenerator) getAndModifyUpstreamConfigForListener(id string, u * if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. - s.Logger.Warn("failed to parse", "upstream", id, "error", err) + s.Logger.Warn("failed to parse", "upstream", uid, "error", err) } if cfg.EnvoyListenerJSON != "" { s.Logger.Warn("ignoring escape hatch setting because already configured for", - "discovery chain", chain.ServiceName, "upstream", id, "config", "envoy_listener_json") + "discovery chain", chain.ServiceName, "upstream", uid, "config", "envoy_listener_json") // Remove from config struct so we don't use it later on cfg.EnvoyListenerJSON = "" diff --git a/agent/xds/listeners_ingress.go b/agent/xds/listeners_ingress.go index 8f8f741d1c..899e842d45 100644 --- a/agent/xds/listeners_ingress.go +++ b/agent/xds/listeners_ingress.go @@ -2,9 +2,11 @@ package xds import ( "fmt" + envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/duration" "github.com/golang/protobuf/ptypes/wrappers" @@ -33,15 +35,16 @@ func (s *ResourceGenerator) makeIngressGatewayListeners(address string, cfgSnap // member, because this key/value pair is created only when a // GatewayService is returned in the RPC u := upstreams[0] - id := u.Identifier() - chain := cfgSnap.IngressGateway.DiscoveryChain[id] + uid := proxycfg.NewUpstreamID(&u) + + chain := cfgSnap.IngressGateway.DiscoveryChain[uid] if chain == nil { // Wait until a chain is present in the snapshot. continue } - cfg := s.getAndModifyUpstreamConfigForListener(id, &u, chain) + cfg := s.getAndModifyUpstreamConfigForListener(uid, &u, chain) // RDS, Envoy's Route Discovery Service, is only used for HTTP services with a customized discovery chain. // TODO(freddy): Why can the protocol of the listener be overridden here? @@ -60,9 +63,9 @@ func (s *ResourceGenerator) makeIngressGatewayListeners(address string, cfgSnap filterName := fmt.Sprintf("%s.%s.%s.%s", chain.ServiceName, chain.Namespace, chain.Partition, chain.Datacenter) - l := makePortListenerWithDefault(id, address, u.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND) + l := makePortListenerWithDefault(uid.EnvoyID(), address, u.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND) filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{ - routeName: id, + routeName: uid.EnvoyID(), useRDS: useRDS, clusterName: clusterName, filterName: filterName, diff --git a/agent/xds/listeners_test.go b/agent/xds/listeners_test.go index 5c252fad38..c097176f4b 100644 --- a/agent/xds/listeners_test.go +++ b/agent/xds/listeners_test.go @@ -72,6 +72,8 @@ func TestListenersFromSnapshot(t *testing.T) { snap.Proxy.Upstreams[0].LocalBindPort = 0 snap.Proxy.Upstreams[0].LocalBindSocketPath = "/tmp/service-mesh/client-1/grpc-employee-server" snap.Proxy.Upstreams[0].LocalBindSocketMode = "0640" + + snap.ConnectProxy.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams) }, }, { @@ -95,6 +97,8 @@ func TestListenersFromSnapshot(t *testing.T) { create: proxycfg.TestConfigSnapshot, setup: func(snap *proxycfg.ConfigSnapshot) { snap.Proxy.Upstreams[0].Config["protocol"] = "http" + + snap.ConnectProxy.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams) }, }, { @@ -159,14 +163,21 @@ func TestListenersFromSnapshot(t *testing.T) { create: proxycfg.TestConfigSnapshot, setup: func(snap *proxycfg.ConfigSnapshot) { for i := range snap.Proxy.Upstreams { + if snap.Proxy.Upstreams[i].DestinationName != "db" { + continue // only tweak the db upstream + } if snap.Proxy.Upstreams[i].Config == nil { snap.Proxy.Upstreams[i].Config = map[string]interface{}{} } + + uid := proxycfg.NewUpstreamID(&snap.Proxy.Upstreams[i]) + snap.Proxy.Upstreams[i].Config["envoy_listener_json"] = customListenerJSON(t, customListenerJSONOptions{ - Name: snap.Proxy.Upstreams[i].Identifier() + ":custom-upstream", + Name: uid.EnvoyID() + ":custom-upstream", }) } + snap.ConnectProxy.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams) }, }, { @@ -174,14 +185,22 @@ func TestListenersFromSnapshot(t *testing.T) { create: proxycfg.TestConfigSnapshotDiscoveryChainWithFailover, setup: func(snap *proxycfg.ConfigSnapshot) { for i := range snap.Proxy.Upstreams { + if snap.Proxy.Upstreams[i].DestinationName != "db" { + continue // only tweak the db upstream + } if snap.Proxy.Upstreams[i].Config == nil { snap.Proxy.Upstreams[i].Config = map[string]interface{}{} } + + uid := proxycfg.NewUpstreamID(&snap.Proxy.Upstreams[i]) + snap.Proxy.Upstreams[i].Config["envoy_listener_json"] = customListenerJSON(t, customListenerJSONOptions{ - Name: snap.Proxy.Upstreams[i].Identifier() + ":custom-upstream", + Name: uid.EnvoyID() + ":custom-upstream", }) } + + snap.ConnectProxy.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams) }, }, { @@ -901,7 +920,7 @@ func TestListenersFromSnapshot(t *testing.T) { connect.TestClusterID+".consul", nil, ) - snap.IngressGateway.DiscoveryChain["secure"] = secureChain + snap.IngressGateway.DiscoveryChain[UID("secure")] = secureChain insecureChain := discoverychain.TestCompileConfigEntries( t, @@ -912,7 +931,7 @@ func TestListenersFromSnapshot(t *testing.T) { connect.TestClusterID+".consul", nil, ) - snap.IngressGateway.DiscoveryChain["insecure"] = insecureChain + snap.IngressGateway.DiscoveryChain[UID("insecure")] = insecureChain snap.IngressGateway.Listeners = map[proxycfg.IngressListenerKey]structs.IngressListener{ {Protocol: "tcp", Port: 8080}: { @@ -1084,12 +1103,13 @@ func TestListenersFromSnapshot(t *testing.T) { // DiscoveryChain without an UpstreamConfig should yield a filter chain when in transparent proxy mode google := structs.NewServiceName("google", nil) - snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{ - google.String(): {}, + googleUID := proxycfg.NewUpstreamIDFromServiceName(google) + snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{ + googleUID: {}, } - snap.ConnectProxy.DiscoveryChain[google.String()] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[googleUID] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) - snap.ConnectProxy.WatchedUpstreamEndpoints[google.String()] = map[string]structs.CheckServiceNodes{ + snap.ConnectProxy.WatchedUpstreamEndpoints[googleUID] = map[string]structs.CheckServiceNodes{ "google.default.default.dc1": { structs.CheckServiceNode{ Node: &structs.Node{ @@ -1126,7 +1146,7 @@ func TestListenersFromSnapshot(t *testing.T) { } // DiscoveryChains without endpoints do not get a filter chain because there are no addresses to match on. - snap.ConnectProxy.DiscoveryChain["no-endpoints"] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[UID("no-endpoints")] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil) }, }, { @@ -1144,11 +1164,12 @@ func TestListenersFromSnapshot(t *testing.T) { // DiscoveryChain without an UpstreamConfig should yield a filter chain when in transparent proxy mode google := structs.NewServiceName("google", nil) - snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{ - google.String(): {}, + googleUID := proxycfg.NewUpstreamIDFromServiceName(google) + snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{ + googleUID: {}, } - snap.ConnectProxy.DiscoveryChain[google.String()] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) - snap.ConnectProxy.WatchedUpstreamEndpoints[google.String()] = map[string]structs.CheckServiceNodes{ + snap.ConnectProxy.DiscoveryChain[googleUID] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.WatchedUpstreamEndpoints[googleUID] = map[string]structs.CheckServiceNodes{ "google.default.default.dc1": { structs.CheckServiceNode{ Node: &structs.Node{ @@ -1168,7 +1189,7 @@ func TestListenersFromSnapshot(t *testing.T) { } // DiscoveryChains without endpoints do not get a filter chain because there are no addresses to match on. - snap.ConnectProxy.DiscoveryChain["no-endpoints"] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[UID("no-endpoints")] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil) }, }, { @@ -1178,24 +1199,26 @@ func TestListenersFromSnapshot(t *testing.T) { snap.Proxy.Mode = structs.ProxyModeTransparent kafka := structs.NewServiceName("kafka", nil) mongo := structs.NewServiceName("mongo", nil) + kafkaUID := proxycfg.NewUpstreamIDFromServiceName(kafka) + mongoUID := proxycfg.NewUpstreamIDFromServiceName(mongo) - snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{ - kafka.String(): {}, - mongo.String(): {}, + snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{ + kafkaUID: {}, + mongoUID: {}, } - snap.ConnectProxy.DiscoveryChain[mongo.String()] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil) - snap.ConnectProxy.DiscoveryChain[kafka.String()] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[mongoUID] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[kafkaUID] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil) // We add a filter chains for each passthrough service name. // The filter chain will route to a cluster with the same SNI name. - snap.ConnectProxy.PassthroughUpstreams = map[string]proxycfg.ServicePassthroughAddrs{ - kafka.String(): { + snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]proxycfg.ServicePassthroughAddrs{ + kafkaUID: { SNI: "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul", Addrs: map[string]struct{}{ "9.9.9.9": {}, }, }, - mongo.String(): { + mongoUID: { SNI: "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul", Addrs: map[string]struct{}{ "10.10.10.10": {}, @@ -1205,7 +1228,7 @@ func TestListenersFromSnapshot(t *testing.T) { } // There should still be a filter chain for mongo's virtual address - snap.ConnectProxy.WatchedUpstreamEndpoints[mongo.String()] = map[string]structs.CheckServiceNodes{ + snap.ConnectProxy.WatchedUpstreamEndpoints[mongoUID] = map[string]structs.CheckServiceNodes{ "mongo.default.default.dc1": { structs.CheckServiceNode{ Node: &structs.Node{ @@ -1240,12 +1263,14 @@ func TestListenersFromSnapshot(t *testing.T) { // DiscoveryChain without an UpstreamConfig should yield a filter chain when in transparent proxy mode google := structs.NewServiceName("google", nil) kafka := structs.NewServiceName("kafka", nil) - snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{ - google.String(): {}, - kafka.String(): {}, + googleUID := proxycfg.NewUpstreamIDFromServiceName(google) + kafkaUID := proxycfg.NewUpstreamIDFromServiceName(kafka) + snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{ + googleUID: {}, + kafkaUID: {}, } - snap.ConnectProxy.DiscoveryChain[google.String()] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) - snap.ConnectProxy.DiscoveryChain[kafka.String()] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[googleUID] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[kafkaUID] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil) tgate := structs.CheckServiceNode{ Node: &structs.Node{ @@ -1264,10 +1289,10 @@ func TestListenersFromSnapshot(t *testing.T) { }, }, } - snap.ConnectProxy.WatchedUpstreamEndpoints[google.String()] = map[string]structs.CheckServiceNodes{ + snap.ConnectProxy.WatchedUpstreamEndpoints[googleUID] = map[string]structs.CheckServiceNodes{ "google.default.default.dc1": {tgate}, } - snap.ConnectProxy.WatchedUpstreamEndpoints[kafka.String()] = map[string]structs.CheckServiceNodes{ + snap.ConnectProxy.WatchedUpstreamEndpoints[kafkaUID] = map[string]structs.CheckServiceNodes{ "kafka.default.default.dc1": {tgate}, } }, diff --git a/agent/xds/routes.go b/agent/xds/routes.go index 7cb714daab..8d0b6330ee 100644 --- a/agent/xds/routes.go +++ b/agent/xds/routes.go @@ -48,24 +48,24 @@ func (s *ResourceGenerator) routesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) // "routes" in the snapshot. func (s *ResourceGenerator) routesForConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { var resources []proto.Message - for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain { + for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { if chain.IsDefault() { continue } - explicit := cfgSnap.ConnectProxy.UpstreamConfig[id].HasLocalPortOrSocket() - if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit { + explicit := cfgSnap.ConnectProxy.UpstreamConfig[uid].HasLocalPortOrSocket() + if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit { // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. continue } - virtualHost, err := makeUpstreamRouteForDiscoveryChain(id, chain, []string{"*"}) + virtualHost, err := makeUpstreamRouteForDiscoveryChain(uid.EnvoyID(), chain, []string{"*"}) if err != nil { return nil, err } route := &envoy_route_v3.RouteConfiguration{ - Name: id, + Name: uid.EnvoyID(), VirtualHosts: []*envoy_route_v3.VirtualHost{virtualHost}, // ValidateClusters defaults to true when defined statically and false // when done via RDS. Re-set the reasonable value of true to prevent @@ -173,7 +173,7 @@ func makeNamedDefaultRouteWithLB(clusterName string, lb *structs.LoadBalancer, a func (s *ResourceGenerator) routesForIngressGateway( listeners map[proxycfg.IngressListenerKey]structs.IngressListener, upstreams map[proxycfg.IngressListenerKey]structs.Upstreams, - chains map[string]*structs.CompiledDiscoveryChain, + chains map[proxycfg.UpstreamID]*structs.CompiledDiscoveryChain, ) ([]proto.Message, error) { var result []proto.Message @@ -195,14 +195,14 @@ func (s *ResourceGenerator) routesForIngressGateway( } for _, u := range upstreams { - upstreamID := u.Identifier() - chain := chains[upstreamID] + uid := proxycfg.NewUpstreamID(&u) + chain := chains[uid] if chain == nil { continue } domains := generateUpstreamIngressDomains(listenerKey, u) - virtualHost, err := makeUpstreamRouteForDiscoveryChain(upstreamID, chain, domains) + virtualHost, err := makeUpstreamRouteForDiscoveryChain(uid.EnvoyID(), chain, domains) if err != nil { return nil, err } diff --git a/agent/xds/routes_test.go b/agent/xds/routes_test.go index cd76184dbc..d0c1b44d9b 100644 --- a/agent/xds/routes_test.go +++ b/agent/xds/routes_test.go @@ -204,11 +204,11 @@ func TestRoutesFromSnapshot(t *testing.T) { bazChain := discoverychain.TestCompileConfigEntries(t, "baz", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...) quxChain := discoverychain.TestCompileConfigEntries(t, "qux", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...) - snap.IngressGateway.DiscoveryChain = map[string]*structs.CompiledDiscoveryChain{ - "foo": fooChain, - "bar": barChain, - "baz": bazChain, - "qux": quxChain, + snap.IngressGateway.DiscoveryChain = map[proxycfg.UpstreamID]*structs.CompiledDiscoveryChain{ + UID("foo"): fooChain, + UID("bar"): barChain, + UID("baz"): bazChain, + UID("qux"): quxChain, } }, }, @@ -667,6 +667,9 @@ func setupIngressWithTwoHTTPServices(t *testing.T, o ingressSDSOpts) func(snap * }, } + webUID := proxycfg.NewUpstreamID(&webUpstream) + fooUID := proxycfg.NewUpstreamID(&fooUpstream) + // Setup additional HTTP service on same listener with default router snap.IngressGateway.Upstreams = map[proxycfg.IngressListenerKey]structs.Upstreams{ {Protocol: "http", Port: 9191}: {webUpstream, fooUpstream}, @@ -775,7 +778,7 @@ func setupIngressWithTwoHTTPServices(t *testing.T, o ingressSDSOpts) func(snap * o.entMetas["web"].PartitionOrDefault(), "dc1", connect.TestClusterID+".consul", nil, entries...) - snap.IngressGateway.DiscoveryChain[webUpstream.Identifier()] = webChain - snap.IngressGateway.DiscoveryChain[fooUpstream.Identifier()] = fooChain + snap.IngressGateway.DiscoveryChain[webUID] = webChain + snap.IngressGateway.DiscoveryChain[fooUID] = fooChain } } diff --git a/agent/xds/testdata/listeners/custom-upstream.envoy-1-20-x.golden b/agent/xds/testdata/listeners/custom-upstream.envoy-1-20-x.golden index 62e02591d9..d12bdaf998 100644 --- a/agent/xds/testdata/listeners/custom-upstream.envoy-1-20-x.golden +++ b/agent/xds/testdata/listeners/custom-upstream.envoy-1-20-x.golden @@ -27,11 +27,11 @@ }, { "@type": "type.googleapis.com/envoy.config.listener.v3.Listener", - "name": "prepared_query:geo-cache:custom-upstream", + "name": "prepared_query:geo-cache:127.10.10.10:8181", "address": { "socketAddress": { - "address": "11.11.11.11", - "portValue": 11111 + "address": "127.10.10.10", + "portValue": 8181 } }, "filterChains": [ @@ -41,13 +41,14 @@ "name": "envoy.filters.network.tcp_proxy", "typedConfig": { "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", - "statPrefix": "foo-stats", - "cluster": "random-cluster" + "statPrefix": "upstream.prepared_query_geo-cache", + "cluster": "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul" } } ] } - ] + ], + "trafficDirection": "OUTBOUND" }, { "@type": "type.googleapis.com/envoy.config.listener.v3.Listener", diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index dc58e1f9d9..c0be25680b 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -43,8 +43,8 @@ func newTestSnapshot( additionalEntries ...structs.ConfigEntry, ) *proxycfg.ConfigSnapshot { snap := proxycfg.TestConfigSnapshotDiscoveryChainDefaultWithEntries(t, additionalEntries...) - snap.ConnectProxy.PreparedQueryEndpoints = map[string]structs.CheckServiceNodes{ - "prepared_query:geo-cache": proxycfg.TestPreparedQueryNodes(t, "geo-cache"), + snap.ConnectProxy.PreparedQueryEndpoints = map[proxycfg.UpstreamID]structs.CheckServiceNodes{ + UID("prepared_query:geo-cache"): proxycfg.TestPreparedQueryNodes(t, "geo-cache"), } if prevSnap != nil { snap.Roots = prevSnap.Roots @@ -53,7 +53,7 @@ func newTestSnapshot( if dbServiceProtocol != "" { // Simulate ServiceManager injection of protocol snap.Proxy.Upstreams[0].Config["protocol"] = dbServiceProtocol - snap.ConnectProxy.ConfigSnapshotUpstreams.UpstreamConfig = snap.Proxy.Upstreams.ToMap() + snap.ConnectProxy.ConfigSnapshotUpstreams.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams) } return snap }