diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index bd21dd2a75..629c416425 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -18,16 +18,16 @@ type handlerConnectProxy struct { // state. func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, error) { snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig) - snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain) - snap.ConnectProxy.WatchedDiscoveryChains = make(map[string]context.CancelFunc) - snap.ConnectProxy.WatchedUpstreams = make(map[string]map[string]context.CancelFunc) - snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes) - snap.ConnectProxy.WatchedGateways = make(map[string]map[string]context.CancelFunc) - snap.ConnectProxy.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes) + snap.ConnectProxy.DiscoveryChain = make(map[UpstreamID]*structs.CompiledDiscoveryChain) + snap.ConnectProxy.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc) + snap.ConnectProxy.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc) + snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) + snap.ConnectProxy.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc) + snap.ConnectProxy.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType) - snap.ConnectProxy.PreparedQueryEndpoints = make(map[string]structs.CheckServiceNodes) - snap.ConnectProxy.UpstreamConfig = make(map[string]*structs.Upstream) - snap.ConnectProxy.PassthroughUpstreams = make(map[string]ServicePassthroughAddrs) + snap.ConnectProxy.PreparedQueryEndpoints = make(map[UpstreamID]structs.CheckServiceNodes) + snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream) + snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]ServicePassthroughAddrs) // Watch for root changes err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ @@ -106,9 +106,11 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e for i := range s.proxyCfg.Upstreams { u := s.proxyCfg.Upstreams[i] + uid := NewUpstreamID(&u) + // Store defaults keyed under wildcard so they can be applied to centrally configured upstreams if u.DestinationName == structs.WildcardSpecifier { - snap.ConnectProxy.UpstreamConfig[u.DestinationID().String()] = &u + snap.ConnectProxy.UpstreamConfig[uid] = &u continue } @@ -117,7 +119,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e if u.CentrallyConfigured { continue } - snap.ConnectProxy.UpstreamConfig[u.Identifier()] = &u + snap.ConnectProxy.UpstreamConfig[uid] = &u dc := s.source.Datacenter if u.Datacenter != "" { @@ -144,7 +146,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e // the plain discovery chain if there is an error so it's safe to // continue. s.logger.Warn("failed to parse upstream config", - "upstream", u.Identifier(), + "upstream", uid.String(), "error", err, ) } @@ -157,7 +159,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e QueryIDOrName: u.DestinationName, Connect: true, Source: *s.source, - }, "upstream:"+u.Identifier(), s.ch) + }, "upstream:"+uid.String(), s.ch) if err != nil { return snap, err } @@ -176,9 +178,9 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e OverrideMeshGateway: s.proxyCfg.MeshGateway.OverlayWith(u.MeshGateway), OverrideProtocol: cfg.Protocol, OverrideConnectTimeout: cfg.ConnectTimeout(), - }, "discovery-chain:"+u.Identifier(), s.ch) + }, "discovery-chain:"+uid.String(), s.ch) if err != nil { - return snap, fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err) + return snap, fmt.Errorf("failed to watch discovery chain for %s: %v", uid.String(), err) } default: @@ -220,12 +222,14 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv return fmt.Errorf("invalid type for response %T", u.Result) } - seenServices := make(map[string]struct{}) + seenUpstreams := make(map[UpstreamID]struct{}) for _, svc := range resp.Services { - seenServices[svc.String()] = struct{}{} + uid := NewUpstreamIDFromServiceName(svc) + + seenUpstreams[uid] = struct{}{} cfgMap := make(map[string]interface{}) - u, ok := snap.ConnectProxy.UpstreamConfig[svc.String()] + u, ok := snap.ConnectProxy.UpstreamConfig[uid] if ok { cfgMap = u.Config } else { @@ -233,11 +237,12 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv // This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled // by the ResolveServiceConfig endpoint. wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, s.proxyID.WithWildcardNamespace()) - defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardSID.String()] + wildcardUID := NewUpstreamIDFromServiceID(wildcardSID) + defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardUID] if ok { u = defaults cfgMap = defaults.Config - snap.ConnectProxy.UpstreamConfig[svc.String()] = defaults + snap.ConnectProxy.UpstreamConfig[uid] = defaults } } @@ -247,7 +252,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv // the plain discovery chain if there is an error so it's safe to // continue. s.logger.Warn("failed to parse upstream config", - "upstream", u.Identifier(), + "upstream", uid, "error", err, ) } @@ -257,7 +262,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv meshGateway = meshGateway.OverlayWith(u.MeshGateway) } watchOpts := discoveryChainWatchOpts{ - id: svc.String(), + id: NewUpstreamIDFromServiceName(svc), name: svc.Name, namespace: svc.NamespaceOrDefault(), partition: svc.PartitionOrDefault(), @@ -268,69 +273,69 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv up := &handlerUpstreams{handlerState: s.handlerState} err = up.watchDiscoveryChain(ctx, snap, watchOpts) if err != nil { - return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err) + return fmt.Errorf("failed to watch discovery chain for %s: %v", uid, err) } } - snap.ConnectProxy.IntentionUpstreams = seenServices + snap.ConnectProxy.IntentionUpstreams = seenUpstreams // Clean up data from services that were not in the update - for sn, targets := range snap.ConnectProxy.WatchedUpstreams { - if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + for uid, targets := range snap.ConnectProxy.WatchedUpstreams { + if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } - if _, ok := seenServices[sn]; !ok { + if _, ok := seenUpstreams[uid]; !ok { for _, cancelFn := range targets { cancelFn() } - delete(snap.ConnectProxy.WatchedUpstreams, sn) + delete(snap.ConnectProxy.WatchedUpstreams, uid) } } - for sn := range snap.ConnectProxy.WatchedUpstreamEndpoints { - if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + for uid := range snap.ConnectProxy.WatchedUpstreamEndpoints { + if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } - if _, ok := seenServices[sn]; !ok { - delete(snap.ConnectProxy.WatchedUpstreamEndpoints, sn) + if _, ok := seenUpstreams[uid]; !ok { + delete(snap.ConnectProxy.WatchedUpstreamEndpoints, uid) } } - for sn, cancelMap := range snap.ConnectProxy.WatchedGateways { - if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + for uid, cancelMap := range snap.ConnectProxy.WatchedGateways { + if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } - if _, ok := seenServices[sn]; !ok { + if _, ok := seenUpstreams[uid]; !ok { for _, cancelFn := range cancelMap { cancelFn() } - delete(snap.ConnectProxy.WatchedGateways, sn) + delete(snap.ConnectProxy.WatchedGateways, uid) } } - for sn := range snap.ConnectProxy.WatchedGatewayEndpoints { - if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + for uid := range snap.ConnectProxy.WatchedGatewayEndpoints { + if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } - if _, ok := seenServices[sn]; !ok { - delete(snap.ConnectProxy.WatchedGatewayEndpoints, sn) + if _, ok := seenUpstreams[uid]; !ok { + delete(snap.ConnectProxy.WatchedGatewayEndpoints, uid) } } - for sn, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains { - if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + for uid, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains { + if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } - if _, ok := seenServices[sn]; !ok { + if _, ok := seenUpstreams[uid]; !ok { cancelFn() - delete(snap.ConnectProxy.WatchedDiscoveryChains, sn) + delete(snap.ConnectProxy.WatchedDiscoveryChains, uid) } } // These entries are intentionally handled separately from the WatchedDiscoveryChains above. // There have been situations where a discovery watch was cancelled, then fired. // That update event then re-populated the DiscoveryChain map entry, which wouldn't get cleaned up // since there was no known watch for it. - for sn := range snap.ConnectProxy.DiscoveryChain { - if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { + for uid := range snap.ConnectProxy.DiscoveryChain { + if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter { continue } - if _, ok := seenServices[sn]; !ok { - delete(snap.ConnectProxy.DiscoveryChain, sn) + if _, ok := seenUpstreams[uid]; !ok { + delete(snap.ConnectProxy.DiscoveryChain, uid) } } @@ -340,7 +345,8 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEv return fmt.Errorf("invalid type for response: %T", u.Result) } pq := strings.TrimPrefix(u.CorrelationID, "upstream:") - snap.ConnectProxy.PreparedQueryEndpoints[pq] = resp.Nodes + uid := UpstreamIDFromString(pq) + snap.ConnectProxy.PreparedQueryEndpoints[uid] = resp.Nodes case strings.HasPrefix(u.CorrelationID, svcChecksWatchIDPrefix): resp, ok := u.Result.([]structs.CheckType) diff --git a/agent/proxycfg/ingress_gateway.go b/agent/proxycfg/ingress_gateway.go index e14dce07b3..1a5eb5ed9f 100644 --- a/agent/proxycfg/ingress_gateway.go +++ b/agent/proxycfg/ingress_gateway.go @@ -48,12 +48,12 @@ func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot, return snap, err } - snap.IngressGateway.WatchedDiscoveryChains = make(map[string]context.CancelFunc) - snap.IngressGateway.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain) - snap.IngressGateway.WatchedUpstreams = make(map[string]map[string]context.CancelFunc) - snap.IngressGateway.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes) - snap.IngressGateway.WatchedGateways = make(map[string]map[string]context.CancelFunc) - snap.IngressGateway.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes) + snap.IngressGateway.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc) + snap.IngressGateway.DiscoveryChain = make(map[UpstreamID]*structs.CompiledDiscoveryChain) + snap.IngressGateway.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc) + snap.IngressGateway.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) + snap.IngressGateway.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc) + snap.IngressGateway.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) snap.IngressGateway.Listeners = make(map[IngressListenerKey]structs.IngressListener) return snap, nil } @@ -102,13 +102,15 @@ func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u cache.Update // Update our upstreams and watches. var hosts []string - watchedSvcs := make(map[string]struct{}) + watchedSvcs := make(map[UpstreamID]struct{}) upstreamsMap := make(map[IngressListenerKey]structs.Upstreams) for _, service := range services.Services { u := makeUpstream(service) + uid := NewUpstreamID(&u) + watchOpts := discoveryChainWatchOpts{ - id: u.Identifier(), + id: uid, name: u.DestinationName, namespace: u.DestinationNamespace, partition: u.DestinationPartition, @@ -117,9 +119,9 @@ func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u cache.Update up := &handlerUpstreams{handlerState: s.handlerState} err := up.watchDiscoveryChain(ctx, snap, watchOpts) if err != nil { - return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err) + return fmt.Errorf("failed to watch discovery chain for %s: %v", uid, err) } - watchedSvcs[u.Identifier()] = struct{}{} + watchedSvcs[uid] = struct{}{} hosts = append(hosts, service.Hosts...) @@ -132,10 +134,10 @@ func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u cache.Update snap.IngressGateway.Hosts = hosts snap.IngressGateway.HostsSet = true - for id, cancelFn := range snap.IngressGateway.WatchedDiscoveryChains { - if _, ok := watchedSvcs[id]; !ok { + for uid, cancelFn := range snap.IngressGateway.WatchedDiscoveryChains { + if _, ok := watchedSvcs[uid]; !ok { cancelFn() - delete(snap.IngressGateway.WatchedDiscoveryChains, id) + delete(snap.IngressGateway.WatchedDiscoveryChains, uid) } } diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index cdb271ee35..9b7fe0060c 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -187,6 +187,7 @@ func TestManager_BasicLifecycle(t *testing.T) { }) db := structs.NewServiceName("db", nil) + dbUID := NewUpstreamIDFromServiceName(db) // Create test cases using some of the common data above. tests := []*testcase_BasicLifecycle{ @@ -214,28 +215,28 @@ func TestManager_BasicLifecycle(t *testing.T) { ConnectProxy: configSnapshotConnectProxy{ ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{ Leaf: leaf, - DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ - db.String(): dbDefaultChain(), + DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{ + dbUID: dbDefaultChain(), }, - WatchedDiscoveryChains: map[string]context.CancelFunc{}, + WatchedDiscoveryChains: map[UpstreamID]context.CancelFunc{}, WatchedUpstreams: nil, // Clone() clears this out - WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{ - db.String(): { + WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: { "db.default.default.dc1": TestUpstreamNodes(t, db.Name), }, }, WatchedGateways: nil, // Clone() clears this out - WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{ - db.String(): {}, + WatchedGatewayEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: {}, }, - UpstreamConfig: map[string]*structs.Upstream{ - upstreams[0].Identifier(): &upstreams[0], - upstreams[1].Identifier(): &upstreams[1], - upstreams[2].Identifier(): &upstreams[2], + UpstreamConfig: map[UpstreamID]*structs.Upstream{ + NewUpstreamID(&upstreams[0]): &upstreams[0], + NewUpstreamID(&upstreams[1]): &upstreams[1], + NewUpstreamID(&upstreams[2]): &upstreams[2], }, - PassthroughUpstreams: map[string]ServicePassthroughAddrs{}, + PassthroughUpstreams: map[UpstreamID]ServicePassthroughAddrs{}, }, - PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{}, + PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, Intentions: TestIntentions().Matches[0], IntentionsSet: true, @@ -271,29 +272,29 @@ func TestManager_BasicLifecycle(t *testing.T) { ConnectProxy: configSnapshotConnectProxy{ ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{ Leaf: leaf, - DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ - db.String(): dbSplitChain(), + DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{ + dbUID: dbSplitChain(), }, - WatchedDiscoveryChains: map[string]context.CancelFunc{}, + WatchedDiscoveryChains: map[UpstreamID]context.CancelFunc{}, WatchedUpstreams: nil, // Clone() clears this out - WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{ - db.String(): { + WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: { "v1.db.default.default.dc1": TestUpstreamNodes(t, db.Name), "v2.db.default.default.dc1": TestUpstreamNodesAlternate(t), }, }, WatchedGateways: nil, // Clone() clears this out - WatchedGatewayEndpoints: map[string]map[string]structs.CheckServiceNodes{ - db.String(): {}, + WatchedGatewayEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: {}, }, - UpstreamConfig: map[string]*structs.Upstream{ - upstreams[0].Identifier(): &upstreams[0], - upstreams[1].Identifier(): &upstreams[1], - upstreams[2].Identifier(): &upstreams[2], + UpstreamConfig: map[UpstreamID]*structs.Upstream{ + NewUpstreamID(&upstreams[0]): &upstreams[0], + NewUpstreamID(&upstreams[1]): &upstreams[1], + NewUpstreamID(&upstreams[2]): &upstreams[2], }, - PassthroughUpstreams: map[string]ServicePassthroughAddrs{}, + PassthroughUpstreams: map[UpstreamID]ServicePassthroughAddrs{}, }, - PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{}, + PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, WatchedServiceChecks: map[structs.ServiceID][]structs.CheckType{}, Intentions: TestIntentions().Matches[0], IntentionsSet: true, diff --git a/agent/proxycfg/naming.go b/agent/proxycfg/naming.go new file mode 100644 index 0000000000..5304b8d1a6 --- /dev/null +++ b/agent/proxycfg/naming.go @@ -0,0 +1,130 @@ +package proxycfg + +import ( + "strings" + + "github.com/hashicorp/consul/agent/structs" +) + +type UpstreamID struct { + Type string + Name string + Datacenter string + structs.EnterpriseMeta +} + +func NewUpstreamID(u *structs.Upstream) UpstreamID { + id := UpstreamID{ + Type: u.DestinationType, + Name: u.DestinationName, + Datacenter: u.Datacenter, + EnterpriseMeta: structs.NewEnterpriseMetaWithPartition( + u.DestinationPartition, + u.DestinationNamespace, + ), + } + id.normalize() + return id +} + +func NewUpstreamIDFromServiceName(sn structs.ServiceName) UpstreamID { + id := UpstreamID{ + Name: sn.Name, + EnterpriseMeta: sn.EnterpriseMeta, + } + id.normalize() + return id +} + +func NewUpstreamIDFromServiceID(sid structs.ServiceID) UpstreamID { + id := UpstreamID{ + Name: sid.ID, + EnterpriseMeta: sid.EnterpriseMeta, + } + id.normalize() + return id +} + +func (u *UpstreamID) normalize() { + if u.Type == structs.UpstreamDestTypeService { + u.Type = "" + } + + u.EnterpriseMeta.Normalize() +} + +// String encodes the UpstreamID into a string for use in agent cache keys. +// You can decode it back again using UpstreamIDFromString. +func (u UpstreamID) String() string { + return UpstreamIDString(u.Type, u.Datacenter, u.Name, &u.EnterpriseMeta) +} + +func (u UpstreamID) GoString() string { + return u.String() +} + +func UpstreamIDFromString(input string) UpstreamID { + typ, dc, name, entMeta := ParseUpstreamIDString(input) + id := UpstreamID{ + Type: typ, + Datacenter: dc, + Name: name, + EnterpriseMeta: *entMeta, + } + id.normalize() + return id +} + +const upstreamTypePreparedQueryPrefix = structs.UpstreamDestTypePreparedQuery + ":" + +func ParseUpstreamIDString(input string) (typ, dc, name string, meta *structs.EnterpriseMeta) { + if strings.HasPrefix(input, upstreamTypePreparedQueryPrefix) { + typ = structs.UpstreamDestTypePreparedQuery + input = strings.TrimPrefix(input, upstreamTypePreparedQueryPrefix) + } + + idx := strings.LastIndex(input, "?dc=") + if idx != -1 { + dc = input[idx+4:] + input = input[0:idx] + } + + name, meta = parseInnerUpstreamIDString(input) + + return typ, dc, name, meta +} + +// EnvoyID returns a string representation that uniquely identifies the +// upstream in a canonical but human readable way. +// +// This should be used for any situation where we generate identifiers in Envoy +// xDS structures for this upstream. +// +// This will ensure that generated identifiers for the same thing in OSS and +// Enterprise render the same and omit default namespaces and partitions. +func (u UpstreamID) EnvoyID() string { + name := u.enterpriseIdentifierPrefix() + u.Name + typ := u.Type + + if u.Datacenter != "" { + name += "?dc=" + u.Datacenter + } + + // Service is default type so never prefix it. This is more readable and long + // term it is the only type that matters so we can drop the prefix and have + // nicer naming in metrics etc. + if typ == "" || typ == structs.UpstreamDestTypeService { + return name + } + return typ + ":" + name +} + +func UpstreamsToMap(us structs.Upstreams) map[UpstreamID]*structs.Upstream { + upstreamMap := make(map[UpstreamID]*structs.Upstream) + + for i := range us { + u := us[i] + upstreamMap[NewUpstreamID(&u)] = &u + } + return upstreamMap +} diff --git a/agent/proxycfg/naming_oss.go b/agent/proxycfg/naming_oss.go new file mode 100644 index 0000000000..bbcf1d0e82 --- /dev/null +++ b/agent/proxycfg/naming_oss.go @@ -0,0 +1,30 @@ +//go:build !consulent +// +build !consulent + +package proxycfg + +import ( + "github.com/hashicorp/consul/agent/structs" +) + +func UpstreamIDString(typ, dc, name string, _ *structs.EnterpriseMeta) string { + ret := name + + if dc != "" { + ret += "?dc=" + dc + } + + if typ == "" || typ == structs.UpstreamDestTypeService { + return ret + } + + return typ + ":" + ret +} + +func parseInnerUpstreamIDString(input string) (string, *structs.EnterpriseMeta) { + return input, structs.DefaultEnterpriseMetaInDefaultPartition() +} + +func (u UpstreamID) enterpriseIdentifierPrefix() string { + return "" +} diff --git a/agent/proxycfg/naming_test.go b/agent/proxycfg/naming_test.go new file mode 100644 index 0000000000..d74ae7e05c --- /dev/null +++ b/agent/proxycfg/naming_test.go @@ -0,0 +1,195 @@ +package proxycfg + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/structs" +) + +func TestUpstreamIDFromString(t *testing.T) { + type testcase struct { + id string + expect UpstreamID + } + run := func(t *testing.T, tc testcase) { + tc.expect.EnterpriseMeta.Normalize() + + got := UpstreamIDFromString(tc.id) + require.Equal(t, tc.expect, got) + } + + prefix := "" + if structs.DefaultEnterpriseMetaInDefaultPartition().PartitionOrEmpty() != "" { + prefix = "default/default/" + } + + cases := map[string]testcase{ + "prepared query": { + "prepared_query:" + prefix + "foo", + UpstreamID{ + Type: structs.UpstreamDestTypePreparedQuery, + Name: "foo", + }, + }, + "prepared query dc": { + "prepared_query:" + prefix + "foo?dc=dc2", + UpstreamID{ + Type: structs.UpstreamDestTypePreparedQuery, + Name: "foo", + Datacenter: "dc2", + }, + }, + "normal": { + prefix + "foo", + UpstreamID{ + Name: "foo", + }, + }, + "normal dc": { + prefix + "foo?dc=dc2", + UpstreamID{ + Name: "foo", + Datacenter: "dc2", + }, + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func TestUpstreamID_String(t *testing.T) { + type testcase struct { + u UpstreamID + expect string + } + run := func(t *testing.T, tc testcase) { + got := tc.u.String() + require.Equal(t, tc.expect, got) + } + + prefix := "" + if structs.DefaultEnterpriseMetaInDefaultPartition().PartitionOrEmpty() != "" { + prefix = "default/default/" + } + + cases := map[string]testcase{ + "prepared query": { + UpstreamID{ + Type: structs.UpstreamDestTypePreparedQuery, + Name: "foo", + }, + "prepared_query:" + prefix + "foo", + }, + "prepared query dc": { + UpstreamID{ + Type: structs.UpstreamDestTypePreparedQuery, + Name: "foo", + Datacenter: "dc2", + }, + "prepared_query:" + prefix + "foo?dc=dc2", + }, + "normal implicit": { + UpstreamID{ + Name: "foo", + }, + prefix + "foo", + }, + "normal implicit dc": { + UpstreamID{ + Name: "foo", + Datacenter: "dc2", + }, + prefix + "foo?dc=dc2", + }, + "normal explicit": { + UpstreamID{ + Type: structs.UpstreamDestTypeService, + Name: "foo", + }, + prefix + "foo", + }, + "normal explicit dc": { + UpstreamID{ + Type: structs.UpstreamDestTypeService, + Name: "foo", + Datacenter: "dc2", + }, + prefix + "foo?dc=dc2", + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + run(t, tc) + }) + } +} + +func TestUpstreamID_EnvoyID(t *testing.T) { + type testcase struct { + u UpstreamID + expect string + } + run := func(t *testing.T, tc testcase) { + got := tc.u.EnvoyID() + require.Equal(t, tc.expect, got) + } + + cases := map[string]testcase{ + "prepared query": { + UpstreamID{ + Type: structs.UpstreamDestTypePreparedQuery, + Name: "foo", + }, + "prepared_query:foo", + }, + "prepared query dc": { + UpstreamID{ + Type: structs.UpstreamDestTypePreparedQuery, + Name: "foo", + Datacenter: "dc2", + }, + "prepared_query:foo?dc=dc2", + }, + "normal implicit": { + UpstreamID{ + Name: "foo", + }, + "foo", + }, + "normal implicit dc": { + UpstreamID{ + Name: "foo", + Datacenter: "dc2", + }, + "foo?dc=dc2", + }, + "normal explicit": { + UpstreamID{ + Type: structs.UpstreamDestTypeService, + Name: "foo", + }, + "foo", + }, + "normal explicit dc": { + UpstreamID{ + Type: structs.UpstreamDestTypeService, + Name: "foo", + Datacenter: "dc2", + }, + "foo?dc=dc2", + }, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + run(t, tc) + }) + } +} diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index 219c45694c..35bea81f57 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -18,47 +18,47 @@ import ( type ConfigSnapshotUpstreams struct { Leaf *structs.IssuedCert - // DiscoveryChain is a map of upstream.Identifier() -> - // CompiledDiscoveryChain's, and is used to determine what services could be - // targeted by this upstream. We then instantiate watches for those targets. - DiscoveryChain map[string]*structs.CompiledDiscoveryChain + // DiscoveryChain is a map of UpstreamID -> CompiledDiscoveryChain's, and + // is used to determine what services could be targeted by this upstream. + // We then instantiate watches for those targets. + DiscoveryChain map[UpstreamID]*structs.CompiledDiscoveryChain - // WatchedDiscoveryChains is a map of upstream.Identifier() -> CancelFunc's + // WatchedDiscoveryChains is a map of UpstreamID -> CancelFunc's // in order to cancel any watches when the proxy's configuration is // changed. Ingress gateways and transparent proxies need this because // discovery chain watches are added and removed through the lifecycle // of a single proxycfg state instance. - WatchedDiscoveryChains map[string]context.CancelFunc + WatchedDiscoveryChains map[UpstreamID]context.CancelFunc - // WatchedUpstreams is a map of upstream.Identifier() -> (map of TargetID -> + // WatchedUpstreams is a map of UpstreamID -> (map of TargetID -> // CancelFunc's) in order to cancel any watches when the configuration is // changed. - WatchedUpstreams map[string]map[string]context.CancelFunc + WatchedUpstreams map[UpstreamID]map[string]context.CancelFunc - // WatchedUpstreamEndpoints is a map of upstream.Identifier() -> (map of + // WatchedUpstreamEndpoints is a map of UpstreamID -> (map of // TargetID -> CheckServiceNodes) and is used to determine the backing // endpoints of an upstream. - WatchedUpstreamEndpoints map[string]map[string]structs.CheckServiceNodes + WatchedUpstreamEndpoints map[UpstreamID]map[string]structs.CheckServiceNodes - // WatchedGateways is a map of upstream.Identifier() -> (map of - // GatewayKey.String() -> CancelFunc) in order to cancel watches for mesh gateways - WatchedGateways map[string]map[string]context.CancelFunc + // WatchedGateways is a map of UpstreamID -> (map of GatewayKey.String() -> + // CancelFunc) in order to cancel watches for mesh gateways + WatchedGateways map[UpstreamID]map[string]context.CancelFunc - // WatchedGatewayEndpoints is a map of upstream.Identifier() -> (map of - // GatewayKey.String() -> CheckServiceNodes) and is used to determine the backing - // endpoints of a mesh gateway. - WatchedGatewayEndpoints map[string]map[string]structs.CheckServiceNodes + // WatchedGatewayEndpoints is a map of UpstreamID -> (map of + // GatewayKey.String() -> CheckServiceNodes) and is used to determine the + // backing endpoints of a mesh gateway. + WatchedGatewayEndpoints map[UpstreamID]map[string]structs.CheckServiceNodes // UpstreamConfig is a map to an upstream's configuration. - UpstreamConfig map[string]*structs.Upstream + UpstreamConfig map[UpstreamID]*structs.Upstream - // PassthroughEndpoints is a map of: ServiceName -> ServicePassthroughAddrs. - PassthroughUpstreams map[string]ServicePassthroughAddrs + // PassthroughEndpoints is a map of: UpstreamID -> ServicePassthroughAddrs. + PassthroughUpstreams map[UpstreamID]ServicePassthroughAddrs // IntentionUpstreams is a set of upstreams inferred from intentions. - // The keys are created with structs.ServiceName.String(). + // // This list only applies to proxies registered in 'transparent' mode. - IntentionUpstreams map[string]struct{} + IntentionUpstreams map[UpstreamID]struct{} } type GatewayKey struct { @@ -107,7 +107,7 @@ type configSnapshotConnectProxy struct { ConfigSnapshotUpstreams WatchedServiceChecks map[structs.ServiceID][]structs.CheckType // TODO: missing garbage collection - PreparedQueryEndpoints map[string]structs.CheckServiceNodes // DEPRECATED:see:WatchedUpstreamEndpoints + PreparedQueryEndpoints map[UpstreamID]structs.CheckServiceNodes // DEPRECATED:see:WatchedUpstreamEndpoints // NOTE: Intentions stores a list of lists as returned by the Intentions // Match RPC. So far we only use the first list as the list of matching @@ -371,8 +371,8 @@ type configSnapshotIngressGateway struct { // the GatewayServices RPC to retrieve them. Upstreams map[IngressListenerKey]structs.Upstreams - // UpstreamsSet is the unique set of upstream.Identifier() the gateway routes to. - UpstreamsSet map[string]struct{} + // UpstreamsSet is the unique set of UpstreamID the gateway routes to. + UpstreamsSet map[UpstreamID]struct{} // Listeners is the original listener config from the ingress-gateway config // entry to save us trying to pass fields through Upstreams diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 586bc39386..f31c62b4e1 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -437,7 +437,7 @@ type gatewayWatchOpts struct { source structs.QuerySource token string key GatewayKey - upstreamID string + upstreamID UpstreamID } func watchMeshGateway(ctx context.Context, opts gatewayWatchOpts) error { @@ -448,5 +448,5 @@ func watchMeshGateway(ctx context.Context, opts gatewayWatchOpts) error { UseServiceKind: true, Source: opts.source, EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(opts.key.Partition), - }, fmt.Sprintf("mesh-gateway:%s:%s", opts.key.String(), opts.upstreamID), opts.notifyCh) + }, fmt.Sprintf("mesh-gateway:%s:%s", opts.key.String(), opts.upstreamID.String()), opts.notifyCh) } diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 77d8fee39b..e43fc6481d 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -381,8 +381,9 @@ func ingressConfigWatchEvent(gwTLS bool, mixedTLS bool) cache.UpdateEvent { } } -func upstreamIDForDC2(name string) string { - return fmt.Sprintf("%s?dc=dc2", name) +func upstreamIDForDC2(uid UpstreamID) UpstreamID { + uid.Datacenter = "dc2" + return uid } // This test is meant to exercise the various parts of the cache watching done by the state as @@ -410,6 +411,10 @@ func TestState_WatchesAndUpdates(t *testing.T) { db = structs.NewServiceName("db", nil) billing = structs.NewServiceName("billing", nil) api = structs.NewServiceName("api", nil) + + apiUID = NewUpstreamIDFromServiceName(api) + dbUID = NewUpstreamIDFromServiceName(db) + pqUID = UpstreamIDFromString("prepared_query:query") ) rootWatchEvent := func() cache.UpdateEvent { @@ -498,11 +503,11 @@ func TestState_WatchesAndUpdates(t *testing.T) { stage0 := verificationStage{ requiredWatches: map[string]verifyWatchRequest{ - rootsWatchID: genVerifyRootsWatch("dc1"), - leafWatchID: genVerifyLeafWatch("web", "dc1"), - intentionsWatchID: genVerifyIntentionWatch("web", "dc1"), - "upstream:prepared_query:query": genVerifyPreparedQueryWatch("query", "dc1"), - fmt.Sprintf("discovery-chain:%s", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + rootsWatchID: genVerifyRootsWatch("dc1"), + leafWatchID: genVerifyLeafWatch("web", "dc1"), + intentionsWatchID: genVerifyIntentionWatch("web", "dc1"), + "upstream:" + pqUID.String(): genVerifyPreparedQueryWatch("query", "dc1"), + fmt.Sprintf("discovery-chain:%s", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -512,7 +517,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Mode: meshGatewayProxyConfigValue, }, }), - fmt.Sprintf("discovery-chain:%s-failover-remote?dc=dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + fmt.Sprintf("discovery-chain:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api-failover-remote", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -522,7 +527,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Mode: structs.MeshGatewayModeRemote, }, }), - fmt.Sprintf("discovery-chain:%s-failover-local?dc=dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + fmt.Sprintf("discovery-chain:%s-failover-local?dc=dc2", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api-failover-local", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -532,7 +537,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Mode: structs.MeshGatewayModeLocal, }, }), - fmt.Sprintf("discovery-chain:%s-failover-direct?dc=dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + fmt.Sprintf("discovery-chain:%s-failover-direct?dc=dc2", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api-failover-direct", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -542,7 +547,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Mode: structs.MeshGatewayModeNone, }, }), - fmt.Sprintf("discovery-chain:%s-dc2", api.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + fmt.Sprintf("discovery-chain:%s-dc2", apiUID.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api-dc2", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -566,7 +571,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Err: nil, }, { - CorrelationID: fmt.Sprintf("discovery-chain:%s", api.String()), + CorrelationID: fmt.Sprintf("discovery-chain:%s", apiUID.String()), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", func(req *discoverychain.CompileRequest) { @@ -576,7 +581,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Err: nil, }, { - CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-remote?dc=dc2", api.String()), + CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-remote?dc=dc2", apiUID.String()), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-remote", "default", "default", "dc2", "trustdomain.consul", func(req *discoverychain.CompileRequest) { @@ -586,7 +591,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Err: nil, }, { - CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-local?dc=dc2", api.String()), + CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-local?dc=dc2", apiUID.String()), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-local", "default", "default", "dc2", "trustdomain.consul", func(req *discoverychain.CompileRequest) { @@ -596,7 +601,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Err: nil, }, { - CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-direct?dc=dc2", api.String()), + CorrelationID: fmt.Sprintf("discovery-chain:%s-failover-direct?dc=dc2", apiUID.String()), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "api-failover-direct", "default", "default", "dc2", "trustdomain.consul", func(req *discoverychain.CompileRequest) { @@ -606,7 +611,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { Err: nil, }, { - CorrelationID: fmt.Sprintf("discovery-chain:%s-dc2", api.String()), + CorrelationID: fmt.Sprintf("discovery-chain:%s-dc2", apiUID.String()), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "api-dc2", "default", "default", "dc1", "trustdomain.consul", func(req *discoverychain.CompileRequest) { @@ -645,12 +650,12 @@ func TestState_WatchesAndUpdates(t *testing.T) { stage1 := verificationStage{ requiredWatches: map[string]verifyWatchRequest{ - fmt.Sprintf("upstream-target:api.default.default.dc1:%s", api.String()): genVerifyServiceWatch("api", "", "dc1", true), - fmt.Sprintf("upstream-target:api-failover-remote.default.default.dc2:%s-failover-remote?dc=dc2", api.String()): genVerifyServiceWatch("api-failover-remote", "", "dc2", true), - fmt.Sprintf("upstream-target:api-failover-local.default.default.dc2:%s-failover-local?dc=dc2", api.String()): genVerifyServiceWatch("api-failover-local", "", "dc2", true), - fmt.Sprintf("upstream-target:api-failover-direct.default.default.dc2:%s-failover-direct?dc=dc2", api.String()): genVerifyServiceWatch("api-failover-direct", "", "dc2", true), - fmt.Sprintf("mesh-gateway:dc2:%s-failover-remote?dc=dc2", api.String()): genVerifyGatewayWatch("dc2"), - fmt.Sprintf("mesh-gateway:dc1:%s-failover-local?dc=dc2", api.String()): genVerifyGatewayWatch("dc1"), + fmt.Sprintf("upstream-target:api.default.default.dc1:%s", apiUID.String()): genVerifyServiceWatch("api", "", "dc1", true), + fmt.Sprintf("upstream-target:api-failover-remote.default.default.dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-remote", "", "dc2", true), + fmt.Sprintf("upstream-target:api-failover-local.default.default.dc2:%s-failover-local?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-local", "", "dc2", true), + fmt.Sprintf("upstream-target:api-failover-direct.default.default.dc2:%s-failover-direct?dc=dc2", apiUID.String()): genVerifyServiceWatch("api-failover-direct", "", "dc2", true), + fmt.Sprintf("mesh-gateway:dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc2"), + fmt.Sprintf("mesh-gateway:dc1:%s-failover-local?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc1"), }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.True(t, snap.Valid()) @@ -673,7 +678,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { } if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal { - stage1.requiredWatches[fmt.Sprintf("mesh-gateway:dc1:%s-dc2", api.String())] = genVerifyGatewayWatch("dc1") + stage1.requiredWatches[fmt.Sprintf("mesh-gateway:dc1:%s-dc2", apiUID.String())] = genVerifyGatewayWatch("dc1") } return testCase{ @@ -999,7 +1004,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }) require.Len(t, snap.IngressGateway.WatchedDiscoveryChains, 1) - require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, api.String()) + require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, apiUID) }, }, { @@ -1020,7 +1025,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, { requiredWatches: map[string]verifyWatchRequest{ - "discovery-chain:" + api.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + "discovery-chain:" + apiUID.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "api", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -1030,7 +1035,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, events: []cache.UpdateEvent{ { - CorrelationID: "discovery-chain:" + api.String(), + CorrelationID: "discovery-chain:" + apiUID.String(), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "api", "default", "default", "dc1", "trustdomain.consul", nil), }, @@ -1039,16 +1044,16 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.IngressGateway.WatchedUpstreams, 1) - require.Len(t, snap.IngressGateway.WatchedUpstreams[api.String()], 1) + require.Len(t, snap.IngressGateway.WatchedUpstreams[apiUID], 1) }, }, { requiredWatches: map[string]verifyWatchRequest{ - "upstream-target:api.default.default.dc1:" + api.String(): genVerifyServiceWatch("api", "", "dc1", true), + "upstream-target:api.default.default.dc1:" + apiUID.String(): genVerifyServiceWatch("api", "", "dc1", true), }, events: []cache.UpdateEvent{ { - CorrelationID: "upstream-target:api.default.default.dc1:" + api.String(), + CorrelationID: "upstream-target:api.default.default.dc1:" + apiUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: structs.CheckServiceNodes{ { @@ -1068,10 +1073,10 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.IngressGateway.WatchedUpstreamEndpoints, 1) - require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints, api.String()) - require.Len(t, snap.IngressGateway.WatchedUpstreamEndpoints[api.String()], 1) - require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints[api.String()], "api.default.default.dc1") - require.Equal(t, snap.IngressGateway.WatchedUpstreamEndpoints[api.String()]["api.default.default.dc1"], + require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints, apiUID) + require.Len(t, snap.IngressGateway.WatchedUpstreamEndpoints[apiUID], 1) + require.Contains(t, snap.IngressGateway.WatchedUpstreamEndpoints[apiUID], "api.default.default.dc1") + require.Equal(t, snap.IngressGateway.WatchedUpstreamEndpoints[apiUID]["api.default.default.dc1"], structs.CheckServiceNodes{ { Node: &structs.Node{ @@ -1135,7 +1140,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.IngressGateway.Hosts, 1) require.Len(t, snap.IngressGateway.Upstreams, 1) require.Len(t, snap.IngressGateway.WatchedDiscoveryChains, 1) - require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, api.String()) + require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, apiUID) }, }, { @@ -1220,7 +1225,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.IngressGateway.Hosts, 1) require.Len(t, snap.IngressGateway.Upstreams, 1) require.Len(t, snap.IngressGateway.WatchedDiscoveryChains, 1) - require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, api.String()) + require.Contains(t, snap.IngressGateway.WatchedDiscoveryChains, apiUID) }, }, { @@ -1682,8 +1687,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.True(t, snap.TerminatingGateway.IsEmpty()) require.False(t, snap.ConnectProxy.IsEmpty()) - expectUpstreams := map[string]*structs.Upstream{ - db.String(): { + expectUpstreams := map[UpstreamID]*structs.Upstream{ + dbUID: { DestinationName: "db", DestinationNamespace: structs.IntentionDefaultNamespace, DestinationPartition: structs.IntentionDefaultNamespace, @@ -1771,7 +1776,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.ConnectProxy.UpstreamConfig, 1) wc := structs.NewServiceName(structs.WildcardSpecifier, structs.WildcardEnterpriseMetaInDefaultPartition()) - require.Contains(t, snap.ConnectProxy.UpstreamConfig, wc.String()) + wcUID := NewUpstreamIDFromServiceName(wc) + require.Contains(t, snap.ConnectProxy.UpstreamConfig, wcUID) }, }, // Valid snapshot after roots, leaf, and intentions @@ -1833,16 +1839,16 @@ func TestState_WatchesAndUpdates(t *testing.T) { verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.True(t, snap.Valid(), "should still be valid") - require.Equal(t, map[string]struct{}{db.String(): {}}, snap.ConnectProxy.IntentionUpstreams) + require.Equal(t, map[UpstreamID]struct{}{dbUID: {}}, snap.ConnectProxy.IntentionUpstreams) // Should start watch for db's chain - require.Contains(t, snap.ConnectProxy.WatchedDiscoveryChains, db.String()) + require.Contains(t, snap.ConnectProxy.WatchedDiscoveryChains, dbUID) // Should not have results yet require.Empty(t, snap.ConnectProxy.DiscoveryChain) require.Len(t, snap.ConnectProxy.UpstreamConfig, 2) - cfg, ok := snap.ConnectProxy.UpstreamConfig[db.String()] + cfg, ok := snap.ConnectProxy.UpstreamConfig[dbUID] require.True(t, ok) // Upstream config should have been inherited from defaults under wildcard key @@ -1852,7 +1858,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { // Discovery chain updates should be stored { requiredWatches: map[string]verifyWatchRequest{ - "discovery-chain:" + db.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + "discovery-chain:" + dbUID.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "db", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -1864,7 +1870,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, events: []cache.UpdateEvent{ { - CorrelationID: "discovery-chain:" + db.String(), + CorrelationID: "discovery-chain:" + dbUID.String(), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", nil), }, @@ -1873,16 +1879,16 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1) - require.Len(t, snap.ConnectProxy.WatchedUpstreams[db.String()], 1) + require.Len(t, snap.ConnectProxy.WatchedUpstreams[dbUID], 1) }, }, { requiredWatches: map[string]verifyWatchRequest{ - "upstream-target:db.default.default.dc1:" + db.String(): genVerifyServiceWatch("db", "", "dc1", true), + "upstream-target:db.default.default.dc1:" + dbUID.String(): genVerifyServiceWatch("db", "", "dc1", true), }, events: []cache.UpdateEvent{ { - CorrelationID: "upstream-target:db.default.default.dc1:" + db.String(), + CorrelationID: "upstream-target:db.default.default.dc1:" + dbUID.String(), Result: &structs.IndexedCheckServiceNodes{ Nodes: structs.CheckServiceNodes{ { @@ -1931,10 +1937,10 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints, 1) - require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, db.String()) - require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[db.String()], 1) - require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[db.String()], "db.default.default.dc1") - require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[db.String()]["db.default.default.dc1"], + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints, dbUID) + require.Len(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID], 1) + require.Contains(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID], "db.default.default.dc1") + require.Equal(t, snap.ConnectProxy.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"], structs.CheckServiceNodes{ { Node: &structs.Node{ @@ -1980,8 +1986,8 @@ func TestState_WatchesAndUpdates(t *testing.T) { // The LAN service address is used below because transparent proxying // does not support querying service nodes in other DCs, and the WAN address // should not be used in DC-local calls. - require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[string]ServicePassthroughAddrs{ - db.String(): { + require.Equal(t, snap.ConnectProxy.PassthroughUpstreams, map[UpstreamID]ServicePassthroughAddrs{ + dbUID: { SNI: connect.ServiceSNI("db", "", structs.IntentionDefaultNamespace, "", snap.Datacenter, snap.Roots.TrustDomain), SpiffeID: connect.SpiffeIDService{ Host: snap.Roots.TrustDomain, @@ -2001,7 +2007,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { // Discovery chain updates should be stored { requiredWatches: map[string]verifyWatchRequest{ - "discovery-chain:" + db.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + "discovery-chain:" + dbUID.String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "db", EvaluateInDatacenter: "dc1", EvaluateInNamespace: "default", @@ -2013,7 +2019,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, events: []cache.UpdateEvent{ { - CorrelationID: "discovery-chain:" + db.String(), + CorrelationID: "discovery-chain:" + dbUID.String(), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", "trustdomain.consul", nil, &structs.ServiceResolverConfigEntry{ Kind: structs.ServiceResolver, @@ -2028,12 +2034,12 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1) - require.Len(t, snap.ConnectProxy.WatchedUpstreams[db.String()], 2) + require.Len(t, snap.ConnectProxy.WatchedUpstreams[dbUID], 2) // In transparent mode we watch the upstream's endpoints even if the upstream is not a target of its chain. // This will happen in cases like redirects. - require.Contains(t, snap.ConnectProxy.WatchedUpstreams[db.String()], "db.default.default.dc1") - require.Contains(t, snap.ConnectProxy.WatchedUpstreams[db.String()], "mysql.default.default.dc1") + require.Contains(t, snap.ConnectProxy.WatchedUpstreams[dbUID], "db.default.default.dc1") + require.Contains(t, snap.ConnectProxy.WatchedUpstreams[dbUID], "mysql.default.default.dc1") }, }, // Empty list of upstreams should clean everything up @@ -2110,7 +2116,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { leafWatchID: genVerifyLeafWatch("api", "dc1"), intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), - "discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + "discovery-chain:" + upstreamIDForDC2(dbUID).String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "db", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -2129,8 +2135,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.ConnectProxy.UpstreamConfig, 2) wc := structs.NewServiceName(structs.WildcardSpecifier, structs.WildcardEnterpriseMetaInDefaultPartition()) - require.Contains(t, snap.ConnectProxy.UpstreamConfig, wc.String()) - require.Contains(t, snap.ConnectProxy.UpstreamConfig, upstreamIDForDC2(db.String())) + wcUID := NewUpstreamIDFromServiceName(wc) + require.Contains(t, snap.ConnectProxy.UpstreamConfig, wcUID) + require.Contains(t, snap.ConnectProxy.UpstreamConfig, upstreamIDForDC2(dbUID)) }, }, // Valid snapshot after roots, leaf, and intentions @@ -2172,7 +2179,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { // Discovery chain updates should be stored { requiredWatches: map[string]verifyWatchRequest{ - "discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + "discovery-chain:" + upstreamIDForDC2(dbUID).String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "db", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -2183,7 +2190,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, events: []cache.UpdateEvent{ { - CorrelationID: "discovery-chain:" + upstreamIDForDC2(db.String()), + CorrelationID: "discovery-chain:" + upstreamIDForDC2(dbUID).String(), Result: &structs.DiscoveryChainResponse{ Chain: discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc2", "trustdomain.consul", func(req *discoverychain.CompileRequest) { @@ -2195,9 +2202,9 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { require.Len(t, snap.ConnectProxy.WatchedGateways, 1) - require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(db.String())], 1) + require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(dbUID)], 1) require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1) - require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(db.String())], 1) + require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(dbUID)], 1) }, }, // Empty list of upstreams should only clean up implicit upstreams. The explicit upstream db should not @@ -2209,7 +2216,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { "api", "", "dc1", false), leafWatchID: genVerifyLeafWatch("api", "dc1"), intentionsWatchID: genVerifyIntentionWatch("api", "dc1"), - "discovery-chain:" + upstreamIDForDC2(db.String()): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ + "discovery-chain:" + upstreamIDForDC2(dbUID).String(): genVerifyDiscoveryChainWatch(&structs.DiscoveryChainRequest{ Name: "db", EvaluateInDatacenter: "dc2", EvaluateInNamespace: "default", @@ -2238,11 +2245,11 @@ func TestState_WatchesAndUpdates(t *testing.T) { // Explicit upstreams should not be deleted when the empty update event happens since that is // for intention upstreams. require.Len(t, snap.ConnectProxy.DiscoveryChain, 1) - require.Contains(t, snap.ConnectProxy.DiscoveryChain, upstreamIDForDC2(db.String())) + require.Contains(t, snap.ConnectProxy.DiscoveryChain, upstreamIDForDC2(dbUID)) require.Len(t, snap.ConnectProxy.WatchedGateways, 1) - require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(db.String())], 1) + require.Len(t, snap.ConnectProxy.WatchedGateways[upstreamIDForDC2(dbUID)], 1) require.Len(t, snap.ConnectProxy.WatchedUpstreams, 1) - require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(db.String())], 1) + require.Len(t, snap.ConnectProxy.WatchedUpstreams[upstreamIDForDC2(dbUID)], 1) }, }, }, diff --git a/agent/proxycfg/testing.go b/agent/proxycfg/testing.go index 1107623766..11730819cc 100644 --- a/agent/proxycfg/testing.go +++ b/agent/proxycfg/testing.go @@ -696,18 +696,18 @@ func TestConfigSnapshot(t testing.T) *ConfigSnapshot { ConnectProxy: configSnapshotConnectProxy{ ConfigSnapshotUpstreams: ConfigSnapshotUpstreams{ Leaf: leaf, - UpstreamConfig: upstreams.ToMap(), - DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ - "db": dbChain, + UpstreamConfig: UpstreamsToMap(upstreams), + DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{ + UpstreamIDFromString("db"): dbChain, }, - WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{ - "db": { + WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{ + UpstreamIDFromString("db"): { "db.default.default.dc1": TestUpstreamNodes(t, "db"), }, }, }, - PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{ - "prepared_query:geo-cache": TestPreparedQueryNodes(t, "geo-cache"), + PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{ + UpstreamIDFromString("prepared_query:geo-cache"): TestPreparedQueryNodes(t, "geo-cache"), }, Intentions: nil, // no intentions defined IntentionsSet: true, @@ -822,8 +822,8 @@ func testConfigSnapshotDiscoveryChain(t testing.T, variation string, additionalE ConfigSnapshotUpstreams: setupTestVariationConfigEntriesAndSnapshot( t, variation, leaf, additionalEntries..., ), - PreparedQueryEndpoints: map[string]structs.CheckServiceNodes{ - "prepared_query:geo-cache": TestPreparedQueryNodes(t, "geo-cache"), + PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{ + UpstreamIDFromString("prepared_query:geo-cache"): TestPreparedQueryNodes(t, "geo-cache"), }, Intentions: nil, // no intentions defined IntentionsSet: true, @@ -1402,18 +1402,20 @@ func setupTestVariationConfigEntriesAndSnapshot( dbChain := discoverychain.TestCompileConfigEntries(t, "db", "default", "default", "dc1", connect.TestClusterID+".consul", compileSetup, entries...) + dbUID := UpstreamIDFromString("db") + upstreams := structs.TestUpstreams(t) snap := ConfigSnapshotUpstreams{ Leaf: leaf, - DiscoveryChain: map[string]*structs.CompiledDiscoveryChain{ - "db": dbChain, + DiscoveryChain: map[UpstreamID]*structs.CompiledDiscoveryChain{ + dbUID: dbChain, }, - WatchedUpstreamEndpoints: map[string]map[string]structs.CheckServiceNodes{ - "db": { + WatchedUpstreamEndpoints: map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: { "db.default.default.dc1": TestUpstreamNodes(t, "db"), }, }, - UpstreamConfig: upstreams.ToMap(), + UpstreamConfig: UpstreamsToMap(upstreams), } switch variation { @@ -1422,61 +1424,61 @@ func setupTestVariationConfigEntriesAndSnapshot( case "simple": case "external-sni": case "failover": - snap.WatchedUpstreamEndpoints["db"]["fail.default.default.dc1"] = + snap.WatchedUpstreamEndpoints[dbUID]["fail.default.default.dc1"] = TestUpstreamNodesAlternate(t) case "failover-through-remote-gateway-triggered": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc1"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"] = TestUpstreamNodesInStatus(t, "critical") fallthrough case "failover-through-remote-gateway": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc2"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc2"] = TestUpstreamNodesDC2(t) - snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{ - "db": { + snap.WatchedGatewayEndpoints = map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: { "dc2": TestGatewayNodesDC2(t), }, } case "failover-through-double-remote-gateway-triggered": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc1"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"] = TestUpstreamNodesInStatus(t, "critical") - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc2"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc2"] = TestUpstreamNodesInStatusDC2(t, "critical") fallthrough case "failover-through-double-remote-gateway": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc3"] = TestUpstreamNodesDC2(t) - snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{ - "db": { + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc3"] = TestUpstreamNodesDC2(t) + snap.WatchedGatewayEndpoints = map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: { "dc2": TestGatewayNodesDC2(t), "dc3": TestGatewayNodesDC3(t), }, } case "failover-through-local-gateway-triggered": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc1"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"] = TestUpstreamNodesInStatus(t, "critical") fallthrough case "failover-through-local-gateway": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc2"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc2"] = TestUpstreamNodesDC2(t) - snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{ - "db": { + snap.WatchedGatewayEndpoints = map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: { "dc1": TestGatewayNodesDC1(t), }, } case "failover-through-double-local-gateway-triggered": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc1"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc1"] = TestUpstreamNodesInStatus(t, "critical") - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc2"] = + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc2"] = TestUpstreamNodesInStatusDC2(t, "critical") fallthrough case "failover-through-double-local-gateway": - snap.WatchedUpstreamEndpoints["db"]["db.default.default.dc3"] = TestUpstreamNodesDC2(t) - snap.WatchedGatewayEndpoints = map[string]map[string]structs.CheckServiceNodes{ - "db": { + snap.WatchedUpstreamEndpoints[dbUID]["db.default.default.dc3"] = TestUpstreamNodesDC2(t) + snap.WatchedGatewayEndpoints = map[UpstreamID]map[string]structs.CheckServiceNodes{ + dbUID: { "dc1": TestGatewayNodesDC1(t), }, } case "splitter-with-resolver-redirect-multidc": - snap.WatchedUpstreamEndpoints["db"] = map[string]structs.CheckServiceNodes{ + snap.WatchedUpstreamEndpoints[dbUID] = map[string]structs.CheckServiceNodes{ "v1.db.default.default.dc1": TestUpstreamNodes(t, "db"), "v2.db.default.default.dc2": TestUpstreamNodesDC2(t), } @@ -1484,10 +1486,10 @@ func setupTestVariationConfigEntriesAndSnapshot( case "grpc-router": case "chain-and-router": case "http-multiple-services": - snap.WatchedUpstreamEndpoints["foo"] = map[string]structs.CheckServiceNodes{ + snap.WatchedUpstreamEndpoints[UpstreamIDFromString("foo")] = map[string]structs.CheckServiceNodes{ "foo.default.default.dc1": TestUpstreamNodes(t, "foo"), } - snap.WatchedUpstreamEndpoints["bar"] = map[string]structs.CheckServiceNodes{ + snap.WatchedUpstreamEndpoints[UpstreamIDFromString("bar")] = map[string]structs.CheckServiceNodes{ "bar.default.default.dc1": TestUpstreamNodesAlternate(t), } case "lb-resolver": @@ -2147,9 +2149,9 @@ func TestConfigSnapshotIngress_MultipleListenersDuplicateService(t testing.T) *C fooChain := discoverychain.TestCompileConfigEntries(t, "foo", "default", "default", "dc1", connect.TestClusterID+".consul", nil) barChain := discoverychain.TestCompileConfigEntries(t, "bar", "default", "default", "dc1", connect.TestClusterID+".consul", nil) - snap.IngressGateway.DiscoveryChain = map[string]*structs.CompiledDiscoveryChain{ - "foo": fooChain, - "bar": barChain, + snap.IngressGateway.DiscoveryChain = map[UpstreamID]*structs.CompiledDiscoveryChain{ + UpstreamIDFromString("foo"): fooChain, + UpstreamIDFromString("bar"): barChain, } return snap diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index a25cb62f62..c923546c78 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -42,34 +42,35 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up if !ok { return fmt.Errorf("invalid type for response: %T", u.Result) } - svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:") + uidString := strings.TrimPrefix(u.CorrelationID, "discovery-chain:") + uid := UpstreamIDFromString(uidString) switch snap.Kind { case structs.ServiceKindIngressGateway: - if _, ok := snap.IngressGateway.UpstreamsSet[svc]; !ok { + if _, ok := snap.IngressGateway.UpstreamsSet[uid]; !ok { // Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped. // The associated watch was likely cancelled. - delete(upstreamsSnapshot.DiscoveryChain, svc) - s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", svc) + delete(upstreamsSnapshot.DiscoveryChain, uid) + s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", uid) return nil } case structs.ServiceKindConnectProxy: - explicit := snap.ConnectProxy.UpstreamConfig[svc].HasLocalPortOrSocket() - if _, implicit := snap.ConnectProxy.IntentionUpstreams[svc]; !implicit && !explicit { + explicit := snap.ConnectProxy.UpstreamConfig[uid].HasLocalPortOrSocket() + if _, implicit := snap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit { // Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped. // The associated watch was likely cancelled. - delete(upstreamsSnapshot.DiscoveryChain, svc) - s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", svc) + delete(upstreamsSnapshot.DiscoveryChain, uid) + s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", uid) return nil } default: return fmt.Errorf("discovery-chain watch fired for unsupported kind: %s", snap.Kind) } - upstreamsSnapshot.DiscoveryChain[svc] = resp.Chain + upstreamsSnapshot.DiscoveryChain[uid] = resp.Chain - if err := s.resetWatchesFromChain(ctx, svc, resp.Chain, upstreamsSnapshot); err != nil { + if err := s.resetWatchesFromChain(ctx, uid, resp.Chain, upstreamsSnapshot); err != nil { return err } @@ -79,15 +80,17 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up return fmt.Errorf("invalid type for response: %T", u.Result) } correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:") - targetID, svc, ok := removeColonPrefix(correlationID) + targetID, uidString, ok := removeColonPrefix(correlationID) if !ok { return fmt.Errorf("invalid correlation id %q", u.CorrelationID) } - if _, ok := upstreamsSnapshot.WatchedUpstreamEndpoints[svc]; !ok { - upstreamsSnapshot.WatchedUpstreamEndpoints[svc] = make(map[string]structs.CheckServiceNodes) + uid := UpstreamIDFromString(uidString) + + if _, ok := upstreamsSnapshot.WatchedUpstreamEndpoints[uid]; !ok { + upstreamsSnapshot.WatchedUpstreamEndpoints[uid] = make(map[string]structs.CheckServiceNodes) } - upstreamsSnapshot.WatchedUpstreamEndpoints[svc][targetID] = resp.Nodes + upstreamsSnapshot.WatchedUpstreamEndpoints[uid][targetID] = resp.Nodes var passthroughAddrs map[string]ServicePassthroughAddrs @@ -121,8 +124,9 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up Service: svc.Name, } - if _, ok := upstreamsSnapshot.PassthroughUpstreams[svc.String()]; !ok { - upstreamsSnapshot.PassthroughUpstreams[svc.String()] = ServicePassthroughAddrs{ + svcUID := NewUpstreamIDFromServiceName(svc) + if _, ok := upstreamsSnapshot.PassthroughUpstreams[svcUID]; !ok { + upstreamsSnapshot.PassthroughUpstreams[svcUID] = ServicePassthroughAddrs{ SNI: sni, SpiffeID: spiffeID, @@ -136,7 +140,7 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up isRemote := !structs.EqualPartitions(svc.PartitionOrDefault(), s.proxyID.PartitionOrDefault()) addr, _ := node.BestAddress(isRemote) - upstreamsSnapshot.PassthroughUpstreams[svc.String()].Addrs[addr] = struct{}{} + upstreamsSnapshot.PassthroughUpstreams[NewUpstreamIDFromServiceName(svc)].Addrs[addr] = struct{}{} } } @@ -146,14 +150,16 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.Up return fmt.Errorf("invalid type for response: %T", u.Result) } correlationID := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:") - key, svc, ok := removeColonPrefix(correlationID) + key, uidString, ok := removeColonPrefix(correlationID) if !ok { return fmt.Errorf("invalid correlation id %q", u.CorrelationID) } - if _, ok = upstreamsSnapshot.WatchedGatewayEndpoints[svc]; !ok { - upstreamsSnapshot.WatchedGatewayEndpoints[svc] = make(map[string]structs.CheckServiceNodes) + uid := UpstreamIDFromString(uidString) + + if _, ok = upstreamsSnapshot.WatchedGatewayEndpoints[uid]; !ok { + upstreamsSnapshot.WatchedGatewayEndpoints[uid] = make(map[string]structs.CheckServiceNodes) } - upstreamsSnapshot.WatchedGatewayEndpoints[svc][key] = resp.Nodes + upstreamsSnapshot.WatchedGatewayEndpoints[uid][key] = resp.Nodes default: return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID) @@ -171,27 +177,27 @@ func removeColonPrefix(s string) (string, string, bool) { func (s *handlerUpstreams) resetWatchesFromChain( ctx context.Context, - id string, + uid UpstreamID, chain *structs.CompiledDiscoveryChain, snap *ConfigSnapshotUpstreams, ) error { - s.logger.Trace("resetting watches for discovery chain", "id", id) + s.logger.Trace("resetting watches for discovery chain", "id", uid) if chain == nil { return fmt.Errorf("not possible to arrive here with no discovery chain") } // Initialize relevant sub maps. - if _, ok := snap.WatchedUpstreams[id]; !ok { - snap.WatchedUpstreams[id] = make(map[string]context.CancelFunc) + if _, ok := snap.WatchedUpstreams[uid]; !ok { + snap.WatchedUpstreams[uid] = make(map[string]context.CancelFunc) } - if _, ok := snap.WatchedUpstreamEndpoints[id]; !ok { - snap.WatchedUpstreamEndpoints[id] = make(map[string]structs.CheckServiceNodes) + if _, ok := snap.WatchedUpstreamEndpoints[uid]; !ok { + snap.WatchedUpstreamEndpoints[uid] = make(map[string]structs.CheckServiceNodes) } - if _, ok := snap.WatchedGateways[id]; !ok { - snap.WatchedGateways[id] = make(map[string]context.CancelFunc) + if _, ok := snap.WatchedGateways[uid]; !ok { + snap.WatchedGateways[uid] = make(map[string]context.CancelFunc) } - if _, ok := snap.WatchedGatewayEndpoints[id]; !ok { - snap.WatchedGatewayEndpoints[id] = make(map[string]structs.CheckServiceNodes) + if _, ok := snap.WatchedGatewayEndpoints[uid]; !ok { + snap.WatchedGatewayEndpoints[uid] = make(map[string]structs.CheckServiceNodes) } // We could invalidate this selectively based on a hash of the relevant @@ -199,14 +205,14 @@ func (s *handlerUpstreams) resetWatchesFromChain( // upstream when the chain changes in any way. // // TODO(rb): content hash based add/remove - for targetID, cancelFn := range snap.WatchedUpstreams[id] { + for targetID, cancelFn := range snap.WatchedUpstreams[uid] { s.logger.Trace("stopping watch of target", - "upstream", id, + "upstream", uid, "chain", chain.ServiceName, "target", targetID, ) - delete(snap.WatchedUpstreams[id], targetID) - delete(snap.WatchedUpstreamEndpoints[id], targetID) + delete(snap.WatchedUpstreams[uid], targetID) + delete(snap.WatchedUpstreamEndpoints[uid], targetID) cancelFn() } @@ -222,7 +228,7 @@ func (s *handlerUpstreams) resetWatchesFromChain( } opts := targetWatchOpts{ - upstreamID: id, + upstreamID: uid, chainID: target.ID, service: target.Service, filter: target.Subset.Filter, @@ -231,7 +237,7 @@ func (s *handlerUpstreams) resetWatchesFromChain( } err := s.watchUpstreamTarget(ctx, snap, opts) if err != nil { - return fmt.Errorf("failed to watch target %q for upstream %q", target.ID, id) + return fmt.Errorf("failed to watch target %q for upstream %q", target.ID, uid) } // We'll get endpoints from the gateway query, but the health still has @@ -267,7 +273,7 @@ func (s *handlerUpstreams) resetWatchesFromChain( chainEntMeta := structs.NewEnterpriseMetaWithPartition(chain.Partition, chain.Namespace) opts := targetWatchOpts{ - upstreamID: id, + upstreamID: uid, chainID: chainID, service: chain.ServiceName, filter: "", @@ -276,18 +282,18 @@ func (s *handlerUpstreams) resetWatchesFromChain( } err := s.watchUpstreamTarget(ctx, snap, opts) if err != nil { - return fmt.Errorf("failed to watch target %q for upstream %q", chainID, id) + return fmt.Errorf("failed to watch target %q for upstream %q", chainID, uid) } } for key := range needGateways { - if _, ok := snap.WatchedGateways[id][key]; ok { + if _, ok := snap.WatchedGateways[uid][key]; ok { continue } gwKey := gatewayKeyFromString(key) s.logger.Trace("initializing watch of mesh gateway", - "upstream", id, + "upstream", uid, "chain", chain.ServiceName, "datacenter", gwKey.Datacenter, "partition", gwKey.Partition, @@ -300,7 +306,7 @@ func (s *handlerUpstreams) resetWatchesFromChain( source: *s.source, token: s.token, key: gwKey, - upstreamID: id, + upstreamID: uid, } err := watchMeshGateway(ctx, opts) if err != nil { @@ -308,23 +314,23 @@ func (s *handlerUpstreams) resetWatchesFromChain( return err } - snap.WatchedGateways[id][key] = cancel + snap.WatchedGateways[uid][key] = cancel } - for key, cancelFn := range snap.WatchedGateways[id] { + for key, cancelFn := range snap.WatchedGateways[uid] { if _, ok := needGateways[key]; ok { continue } gwKey := gatewayKeyFromString(key) s.logger.Trace("stopping watch of mesh gateway", - "upstream", id, + "upstream", uid, "chain", chain.ServiceName, "datacenter", gwKey.Datacenter, "partition", gwKey.Partition, ) - delete(snap.WatchedGateways[id], key) - delete(snap.WatchedGatewayEndpoints[id], key) + delete(snap.WatchedGateways[uid], key) + delete(snap.WatchedGatewayEndpoints[uid], key) cancelFn() } @@ -332,7 +338,7 @@ func (s *handlerUpstreams) resetWatchesFromChain( } type targetWatchOpts struct { - upstreamID string + upstreamID UpstreamID chainID string service string filter string @@ -350,7 +356,7 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config var finalMeta structs.EnterpriseMeta finalMeta.Merge(opts.entMeta) - correlationID := "upstream-target:" + opts.chainID + ":" + opts.upstreamID + correlationID := "upstream-target:" + opts.chainID + ":" + opts.upstreamID.String() ctx, cancel := context.WithCancel(ctx) err := s.health.Notify(ctx, structs.ServiceSpecificRequest{ @@ -378,7 +384,7 @@ func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *Config } type discoveryChainWatchOpts struct { - id string + id UpstreamID name string namespace string partition string @@ -403,7 +409,7 @@ func (s *handlerUpstreams) watchDiscoveryChain(ctx context.Context, snap *Config OverrideProtocol: opts.cfg.Protocol, OverrideConnectTimeout: opts.cfg.ConnectTimeout(), OverrideMeshGateway: opts.meshGateway, - }, "discovery-chain:"+opts.id, s.ch) + }, "discovery-chain:"+opts.id.String(), s.ch) if err != nil { cancel() return err diff --git a/agent/structs/connect_proxy_config.go b/agent/structs/connect_proxy_config.go index ba0696794c..f59543bdf0 100644 --- a/agent/structs/connect_proxy_config.go +++ b/agent/structs/connect_proxy_config.go @@ -314,15 +314,6 @@ func (us Upstreams) ToAPI() []api.Upstream { return a } -func (us Upstreams) ToMap() map[string]*Upstream { - upstreamMap := make(map[string]*Upstream) - - for i := range us { - upstreamMap[us[i].Identifier()] = &us[i] - } - return upstreamMap -} - // UpstreamsFromAPI is a helper for converting api.Upstream to Upstream. func UpstreamsFromAPI(us []api.Upstream) Upstreams { a := make([]Upstream, len(us)) @@ -551,24 +542,17 @@ func (k UpstreamKey) String() string { ) } -// String implements Stringer by returning the Identifier. -func (u *Upstream) String() string { - return u.Identifier() -} - -// Identifier returns a string representation that uniquely identifies the -// upstream in a canonical but human readable way. -func (us *Upstream) Identifier() string { - name := us.enterpriseIdentifierPrefix() + us.DestinationName +// String returns a representation of this upstream suitable for debugging +// purposes but nothing relies upon this format. +func (us *Upstream) String() string { + name := us.enterpriseStringPrefix() + us.DestinationName typ := us.DestinationType if us.Datacenter != "" { name += "?dc=" + us.Datacenter } - // Service is default type so never prefix it. This is more readable and long - // term it is the only type that matters so we can drop the prefix and have - // nicer naming in metrics etc. + // Service is default type so never prefix it. if typ == "" || typ == UpstreamDestTypeService { return name } diff --git a/agent/structs/connect_proxy_config_oss.go b/agent/structs/connect_proxy_config_oss.go index 61bd2e3936..dff9cc25c9 100644 --- a/agent/structs/connect_proxy_config_oss.go +++ b/agent/structs/connect_proxy_config_oss.go @@ -13,6 +13,6 @@ func (us *Upstream) DestinationID() ServiceID { } } -func (us *Upstream) enterpriseIdentifierPrefix() string { +func (us *Upstream) enterpriseStringPrefix() string { return "" } diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index d534e9e9a4..4d33bf3779 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -77,22 +77,22 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C clusters = append(clusters, passthroughs...) } - for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain { - upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[id] + for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { + upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid] explicit := upstreamCfg.HasLocalPortOrSocket() - if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit { + if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit { // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. continue } - chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id] + chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid] if !ok { // this should not happen - return nil, fmt.Errorf("no endpoint map for upstream %q", id) + return nil, fmt.Errorf("no endpoint map for upstream %q", uid) } - upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(id, upstreamCfg, chain, chainEndpoints, cfgSnap) + upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(uid, upstreamCfg, chain, chainEndpoints, cfgSnap) if err != nil { return nil, err } @@ -387,30 +387,30 @@ func (s *ResourceGenerator) injectGatewayServiceAddons(cfgSnap *proxycfg.ConfigS func (s *ResourceGenerator) clustersFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { var clusters []proto.Message - createdClusters := make(map[string]bool) + createdClusters := make(map[proxycfg.UpstreamID]bool) for _, upstreams := range cfgSnap.IngressGateway.Upstreams { for _, u := range upstreams { - id := u.Identifier() + uid := proxycfg.NewUpstreamID(&u) // If we've already created a cluster for this upstream, skip it. Multiple listeners may // reference the same upstream, so we don't need to create duplicate clusters in that case. - if createdClusters[id] { + if createdClusters[uid] { continue } - chain, ok := cfgSnap.IngressGateway.DiscoveryChain[id] + chain, ok := cfgSnap.IngressGateway.DiscoveryChain[uid] if !ok { // this should not happen - return nil, fmt.Errorf("no discovery chain for upstream %q", id) + return nil, fmt.Errorf("no discovery chain for upstream %q", uid) } - chainEndpoints, ok := cfgSnap.IngressGateway.WatchedUpstreamEndpoints[id] + chainEndpoints, ok := cfgSnap.IngressGateway.WatchedUpstreamEndpoints[uid] if !ok { // this should not happen - return nil, fmt.Errorf("no endpoint map for upstream %q", id) + return nil, fmt.Errorf("no endpoint map for upstream %q", uid) } - upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(id, &u, chain, chainEndpoints, cfgSnap) + upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(uid, &u, chain, chainEndpoints, cfgSnap) if err != nil { return nil, err } @@ -418,7 +418,7 @@ func (s *ResourceGenerator) clustersFromSnapshotIngressGateway(cfgSnap *proxycfg for _, c := range upstreamClusters { clusters = append(clusters, c) } - createdClusters[id] = true + createdClusters[uid] = true } } return clusters, nil @@ -481,6 +481,8 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs var c *envoy_cluster_v3.Cluster var err error + uid := proxycfg.NewUpstreamID(&upstream) + dc := upstream.Datacenter if dc == "" { dc = cfgSnap.Datacenter @@ -491,7 +493,7 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. - s.Logger.Warn("failed to parse", "upstream", upstream.Identifier(), "error", err) + s.Logger.Warn("failed to parse", "upstream", uid, "error", err) } if cfg.EnvoyClusterJSON != "" { c, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON) @@ -524,7 +526,7 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs } } - endpoints := cfgSnap.ConnectProxy.PreparedQueryEndpoints[upstream.Identifier()] + endpoints := cfgSnap.ConnectProxy.PreparedQueryEndpoints[uid] var ( spiffeIDs = make([]connect.SpiffeIDService, 0) seen = make(map[string]struct{}) @@ -571,14 +573,14 @@ func (s *ResourceGenerator) makeUpstreamClusterForPreparedQuery(upstream structs } func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain( - id string, + uid proxycfg.UpstreamID, upstream *structs.Upstream, chain *structs.CompiledDiscoveryChain, chainEndpoints map[string]structs.CheckServiceNodes, cfgSnap *proxycfg.ConfigSnapshot, ) ([]*envoy_cluster_v3.Cluster, error) { if chain == nil { - return nil, fmt.Errorf("cannot create upstream cluster without discovery chain for %s", id) + return nil, fmt.Errorf("cannot create upstream cluster without discovery chain for %s", uid) } configMap := make(map[string]interface{}) @@ -589,7 +591,7 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain( if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. - s.Logger.Warn("failed to parse", "upstream", id, + s.Logger.Warn("failed to parse", "upstream", uid, "error", err) } @@ -604,7 +606,7 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain( } } else { s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configured for", - "discovery chain", chain.ServiceName, "upstream", id, + "discovery chain", chain.ServiceName, "upstream", uid, "envoy_cluster_json", chain.ServiceName) } } diff --git a/agent/xds/clusters_test.go b/agent/xds/clusters_test.go index 6553a22d36..971407efcb 100644 --- a/agent/xds/clusters_test.go +++ b/agent/xds/clusters_test.go @@ -69,8 +69,8 @@ func TestClustersFromSnapshot(t *testing.T) { customAppClusterJSON(t, customClusterJSONOptions{ Name: "myservice", }) - snap.ConnectProxy.UpstreamConfig = map[string]*structs.Upstream{ - "db": { + snap.ConnectProxy.UpstreamConfig = map[proxycfg.UpstreamID]*structs.Upstream{ + UID("db"): { // The local bind port is overridden by the escape hatch, but is required for explicit upstreams. LocalBindPort: 9191, Config: map[string]interface{}{ @@ -669,15 +669,17 @@ func TestClustersFromSnapshot(t *testing.T) { snap.Proxy.Mode = structs.ProxyModeTransparent kafka := structs.NewServiceName("kafka", nil) mongo := structs.NewServiceName("mongo", nil) + kafkaUID := proxycfg.NewUpstreamIDFromServiceName(kafka) + mongoUID := proxycfg.NewUpstreamIDFromServiceName(mongo) - snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{ - kafka.String(): {}, - mongo.String(): {}, + snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{ + kafkaUID: {}, + mongoUID: {}, } // We add a passthrough cluster for each upstream service name - snap.ConnectProxy.PassthroughUpstreams = map[string]proxycfg.ServicePassthroughAddrs{ - kafka.String(): { + snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]proxycfg.ServicePassthroughAddrs{ + kafkaUID: { SNI: "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul", SpiffeID: connect.SpiffeIDService{ Host: "e5b08d03-bfc3-c870-1833-baddb116e648.consul", @@ -689,7 +691,7 @@ func TestClustersFromSnapshot(t *testing.T) { "9.9.9.9": {}, }, }, - mongo.String(): { + mongoUID: { SNI: "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul", SpiffeID: connect.SpiffeIDService{ Host: "e5b08d03-bfc3-c870-1833-baddb116e648.consul", @@ -705,8 +707,8 @@ func TestClustersFromSnapshot(t *testing.T) { } // There should still be a cluster for non-passthrough requests - snap.ConnectProxy.DiscoveryChain[mongo.String()] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil) - snap.ConnectProxy.WatchedUpstreamEndpoints[mongo.String()] = map[string]structs.CheckServiceNodes{ + snap.ConnectProxy.DiscoveryChain[mongoUID] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.WatchedUpstreamEndpoints[mongoUID] = map[string]structs.CheckServiceNodes{ "mongo.default.default.dc1": { structs.CheckServiceNode{ Node: &structs.Node{ @@ -923,3 +925,8 @@ func TestEnvoyLBConfig_InjectToCluster(t *testing.T) { }) } } + +// UID is just a convenience function to aid in writing tests less verbosely. +func UID(input string) proxycfg.UpstreamID { + return proxycfg.UpstreamIDFromString(input) +} diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 99a1f0393b..d5b1a575ba 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -153,9 +153,9 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) }) - deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, svc, targetID string) { - snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[svc][targetID] = - snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[svc][targetID][0:1] + deleteAllButOneEndpoint := func(snap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID, targetID string) { + snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID] = + snap.ConnectProxy.ConfigSnapshotUpstreams.WatchedUpstreamEndpoints[uid][targetID][0:1] } runStep(t, "avoid sending config for unsubscribed resource", func(t *testing.T) { @@ -169,7 +169,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { // now reconfigure the snapshot and JUST edit the endpoints to strike one of the two current endpoints for DB snap = newTestSnapshot(t, snap, "") - deleteAllButOneEndpoint(snap, "db", "db.default.default.dc1") + deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1") mgr.DeliverConfig(t, sid, snap) // We never send an EDS reply about this change. @@ -206,7 +206,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { runStep(t, "simulate envoy NACKing an endpoint update", func(t *testing.T) { // Trigger only an EDS update. snap = newTestSnapshot(t, snap, "") - deleteAllButOneEndpoint(snap, "db", "db.default.default.dc1") + deleteAllButOneEndpoint(snap, UID("db"), "db.default.default.dc1") mgr.DeliverConfig(t, sid, snap) // Send envoy an EDS update. diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index b93b6da760..4742ea5264 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -47,22 +47,22 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. resources := make([]proto.Message, 0, len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints)) - for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain { - upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[id] + for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { + upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid] explicit := upstreamCfg.HasLocalPortOrSocket() - if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit { + if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit { // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. continue } es := s.endpointsFromDiscoveryChain( - id, + uid, chain, cfgSnap.Locality, upstreamCfg, - cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id], - cfgSnap.ConnectProxy.WatchedGatewayEndpoints[id], + cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid], + cfgSnap.ConnectProxy.WatchedGatewayEndpoints[uid], ) resources = append(resources, es...) } @@ -72,7 +72,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. if u.DestinationType != structs.UpstreamDestTypePreparedQuery { continue } - id := u.Identifier() + uid := proxycfg.NewUpstreamID(&u) dc := u.Datacenter if dc == "" { @@ -80,7 +80,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. } clusterName := connect.UpstreamSNI(&u, "", dc, cfgSnap.Roots.TrustDomain) - endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[id] + endpoints, ok := cfgSnap.ConnectProxy.PreparedQueryEndpoints[uid] if ok { la := makeLoadAssignment( clusterName, @@ -318,27 +318,27 @@ func (s *ResourceGenerator) endpointsFromServicesAndResolvers( func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { var resources []proto.Message - createdClusters := make(map[string]bool) + createdClusters := make(map[proxycfg.UpstreamID]bool) for _, upstreams := range cfgSnap.IngressGateway.Upstreams { for _, u := range upstreams { - id := u.Identifier() + uid := proxycfg.NewUpstreamID(&u) // If we've already created endpoints for this upstream, skip it. Multiple listeners may // reference the same upstream, so we don't need to create duplicate endpoints in that case. - if createdClusters[id] { + if createdClusters[uid] { continue } es := s.endpointsFromDiscoveryChain( - id, - cfgSnap.IngressGateway.DiscoveryChain[id], + uid, + cfgSnap.IngressGateway.DiscoveryChain[uid], proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: u.DestinationPartition}, &u, - cfgSnap.IngressGateway.WatchedUpstreamEndpoints[id], - cfgSnap.IngressGateway.WatchedGatewayEndpoints[id], + cfgSnap.IngressGateway.WatchedUpstreamEndpoints[uid], + cfgSnap.IngressGateway.WatchedGatewayEndpoints[uid], ) resources = append(resources, es...) - createdClusters[id] = true + createdClusters[uid] = true } } return resources, nil @@ -366,7 +366,7 @@ func makePipeEndpoint(path string) *envoy_endpoint_v3.LbEndpoint { } func (s *ResourceGenerator) endpointsFromDiscoveryChain( - id string, + uid proxycfg.UpstreamID, chain *structs.CompiledDiscoveryChain, gatewayKey proxycfg.GatewayKey, upstream *structs.Upstream, @@ -387,7 +387,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain( if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. - s.Logger.Warn("failed to parse", "upstream", id, + s.Logger.Warn("failed to parse", "upstream", uid, "error", err) } @@ -402,7 +402,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain( } } else { s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configued for", - "discovery chain", chain.ServiceName, "upstream", id, + "discovery chain", chain.ServiceName, "upstream", uid, "envoy_cluster_json", chain.ServiceName) } } diff --git a/agent/xds/endpoints_test.go b/agent/xds/endpoints_test.go index 1aedb21a9c..83886a3714 100644 --- a/agent/xds/endpoints_test.go +++ b/agent/xds/endpoints_test.go @@ -325,8 +325,8 @@ func TestEndpointsFromSnapshot(t *testing.T) { customAppClusterJSON(t, customClusterJSONOptions{ Name: "myservice", }) - snap.ConnectProxy.UpstreamConfig = map[string]*structs.Upstream{ - "db": { + snap.ConnectProxy.UpstreamConfig = map[proxycfg.UpstreamID]*structs.Upstream{ + UID("db"): { // The local bind port is overridden by the escape hatch, but is required for explicit upstreams. LocalBindPort: 9191, Config: map[string]interface{}{ diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index 319161ff47..fcc3fd2ad9 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -93,16 +93,16 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. } } - for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain { - upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[id] + for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { + upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[uid] explicit := upstreamCfg.HasLocalPortOrSocket() - if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit { + if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit { // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. continue } - cfg := s.getAndModifyUpstreamConfigForListener(id, upstreamCfg, chain) + cfg := s.getAndModifyUpstreamConfigForListener(uid, upstreamCfg, chain) // If escape hatch is present, create a listener from it and move on to the next if cfg.EnvoyListenerJSON != "" { @@ -133,7 +133,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. // Generate the upstream listeners for when they are explicitly set with a local bind port or socket path if upstreamCfg != nil && upstreamCfg.HasLocalPortOrSocket() { filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{ - routeName: id, + routeName: uid.EnvoyID(), clusterName: clusterName, filterName: filterName, protocol: cfg.Protocol, @@ -143,7 +143,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. return nil, err } - upstreamListener := makeListener(id, upstreamCfg, envoy_core_v3.TrafficDirection_OUTBOUND) + upstreamListener := makeListener(uid.EnvoyID(), upstreamCfg, envoy_core_v3.TrafficDirection_OUTBOUND) upstreamListener.FilterChains = []*envoy_listener_v3.FilterChain{ filterChain, } @@ -158,7 +158,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. // as we do for explicit upstreams above. filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{ - routeName: id, + routeName: uid.EnvoyID(), clusterName: clusterName, filterName: filterName, protocol: cfg.Protocol, @@ -168,7 +168,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. return nil, err } - endpoints := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id][chain.ID()] + endpoints := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[uid][chain.ID()] uniqueAddrs := make(map[string]struct{}) // Match on the virtual IP for the upstream service (identified by the chain's ID). @@ -199,7 +199,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. } if len(uniqueAddrs) > 2 { s.Logger.Debug("detected multiple virtual IPs for an upstream, all will be used to match traffic", - "upstream", id, "ip_count", len(uniqueAddrs)) + "upstream", uid, "ip_count", len(uniqueAddrs)) } // For every potential address we collected, create the appropriate address prefix to match on. @@ -218,12 +218,11 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. // as opposed to via a virtual IP. var passthroughChains []*envoy_listener_v3.FilterChain - for svc, passthrough := range cfgSnap.ConnectProxy.PassthroughUpstreams { - sn := structs.ServiceNameFromString(svc) + for uid, passthrough := range cfgSnap.ConnectProxy.PassthroughUpstreams { u := structs.Upstream{ - DestinationName: sn.Name, - DestinationNamespace: sn.NamespaceOrDefault(), - DestinationPartition: sn.PartitionOrDefault(), + DestinationName: uid.Name, + DestinationNamespace: uid.NamespaceOrDefault(), + DestinationPartition: uid.PartitionOrDefault(), } filterName := fmt.Sprintf("%s.%s.%s.%s", u.DestinationName, u.DestinationNamespace, u.DestinationPartition, cfgSnap.Datacenter) @@ -271,7 +270,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. } // Looping over explicit upstreams is only needed for prepared queries because they do not have discovery chains - for id, u := range cfgSnap.ConnectProxy.UpstreamConfig { + for uid, u := range cfgSnap.ConnectProxy.UpstreamConfig { if u.DestinationType != structs.UpstreamDestTypePreparedQuery { continue } @@ -280,7 +279,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. - s.Logger.Warn("failed to parse", "upstream", u.Identifier(), "error", err) + s.Logger.Warn("failed to parse", "upstream", uid, "error", err) } // If escape hatch is present, create a listener from it and move on to the next @@ -288,7 +287,7 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. upstreamListener, err := makeListenerFromUserConfig(cfg.EnvoyListenerJSON) if err != nil { s.Logger.Error("failed to parse envoy_listener_json", - "upstream", u.Identifier(), + "upstream", uid, "error", err) continue } @@ -296,13 +295,13 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg. continue } - upstreamListener := makeListener(id, u, envoy_core_v3.TrafficDirection_OUTBOUND) + upstreamListener := makeListener(uid.EnvoyID(), u, envoy_core_v3.TrafficDirection_OUTBOUND) filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{ // TODO (SNI partition) add partition for upstream SNI clusterName: connect.UpstreamSNI(u, "", cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain), - filterName: id, - routeName: id, + filterName: uid.EnvoyID(), + routeName: uid.EnvoyID(), protocol: cfg.Protocol, }) if err != nil { @@ -1289,7 +1288,11 @@ func simpleChainTarget(chain *structs.CompiledDiscoveryChain) (*structs.Discover return chain.Targets[targetID], nil } -func (s *ResourceGenerator) getAndModifyUpstreamConfigForListener(id string, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) structs.UpstreamConfig { +func (s *ResourceGenerator) getAndModifyUpstreamConfigForListener( + uid proxycfg.UpstreamID, + u *structs.Upstream, + chain *structs.CompiledDiscoveryChain, +) structs.UpstreamConfig { var ( cfg structs.UpstreamConfig err error @@ -1304,7 +1307,7 @@ func (s *ResourceGenerator) getAndModifyUpstreamConfigForListener(id string, u * if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. - s.Logger.Warn("failed to parse", "upstream", id, "error", err) + s.Logger.Warn("failed to parse", "upstream", uid, "error", err) } } else { // Use NoDefaults here so that we can set the protocol to the chain @@ -1313,12 +1316,12 @@ func (s *ResourceGenerator) getAndModifyUpstreamConfigForListener(id string, u * if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. - s.Logger.Warn("failed to parse", "upstream", id, "error", err) + s.Logger.Warn("failed to parse", "upstream", uid, "error", err) } if cfg.EnvoyListenerJSON != "" { s.Logger.Warn("ignoring escape hatch setting because already configured for", - "discovery chain", chain.ServiceName, "upstream", id, "config", "envoy_listener_json") + "discovery chain", chain.ServiceName, "upstream", uid, "config", "envoy_listener_json") // Remove from config struct so we don't use it later on cfg.EnvoyListenerJSON = "" diff --git a/agent/xds/listeners_ingress.go b/agent/xds/listeners_ingress.go index 8f8f741d1c..899e842d45 100644 --- a/agent/xds/listeners_ingress.go +++ b/agent/xds/listeners_ingress.go @@ -2,9 +2,11 @@ package xds import ( "fmt" + envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" envoy_tls_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3" + "github.com/golang/protobuf/proto" "github.com/golang/protobuf/ptypes/duration" "github.com/golang/protobuf/ptypes/wrappers" @@ -33,15 +35,16 @@ func (s *ResourceGenerator) makeIngressGatewayListeners(address string, cfgSnap // member, because this key/value pair is created only when a // GatewayService is returned in the RPC u := upstreams[0] - id := u.Identifier() - chain := cfgSnap.IngressGateway.DiscoveryChain[id] + uid := proxycfg.NewUpstreamID(&u) + + chain := cfgSnap.IngressGateway.DiscoveryChain[uid] if chain == nil { // Wait until a chain is present in the snapshot. continue } - cfg := s.getAndModifyUpstreamConfigForListener(id, &u, chain) + cfg := s.getAndModifyUpstreamConfigForListener(uid, &u, chain) // RDS, Envoy's Route Discovery Service, is only used for HTTP services with a customized discovery chain. // TODO(freddy): Why can the protocol of the listener be overridden here? @@ -60,9 +63,9 @@ func (s *ResourceGenerator) makeIngressGatewayListeners(address string, cfgSnap filterName := fmt.Sprintf("%s.%s.%s.%s", chain.ServiceName, chain.Namespace, chain.Partition, chain.Datacenter) - l := makePortListenerWithDefault(id, address, u.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND) + l := makePortListenerWithDefault(uid.EnvoyID(), address, u.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND) filterChain, err := s.makeUpstreamFilterChain(filterChainOpts{ - routeName: id, + routeName: uid.EnvoyID(), useRDS: useRDS, clusterName: clusterName, filterName: filterName, diff --git a/agent/xds/listeners_test.go b/agent/xds/listeners_test.go index 5c252fad38..c097176f4b 100644 --- a/agent/xds/listeners_test.go +++ b/agent/xds/listeners_test.go @@ -72,6 +72,8 @@ func TestListenersFromSnapshot(t *testing.T) { snap.Proxy.Upstreams[0].LocalBindPort = 0 snap.Proxy.Upstreams[0].LocalBindSocketPath = "/tmp/service-mesh/client-1/grpc-employee-server" snap.Proxy.Upstreams[0].LocalBindSocketMode = "0640" + + snap.ConnectProxy.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams) }, }, { @@ -95,6 +97,8 @@ func TestListenersFromSnapshot(t *testing.T) { create: proxycfg.TestConfigSnapshot, setup: func(snap *proxycfg.ConfigSnapshot) { snap.Proxy.Upstreams[0].Config["protocol"] = "http" + + snap.ConnectProxy.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams) }, }, { @@ -159,14 +163,21 @@ func TestListenersFromSnapshot(t *testing.T) { create: proxycfg.TestConfigSnapshot, setup: func(snap *proxycfg.ConfigSnapshot) { for i := range snap.Proxy.Upstreams { + if snap.Proxy.Upstreams[i].DestinationName != "db" { + continue // only tweak the db upstream + } if snap.Proxy.Upstreams[i].Config == nil { snap.Proxy.Upstreams[i].Config = map[string]interface{}{} } + + uid := proxycfg.NewUpstreamID(&snap.Proxy.Upstreams[i]) + snap.Proxy.Upstreams[i].Config["envoy_listener_json"] = customListenerJSON(t, customListenerJSONOptions{ - Name: snap.Proxy.Upstreams[i].Identifier() + ":custom-upstream", + Name: uid.EnvoyID() + ":custom-upstream", }) } + snap.ConnectProxy.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams) }, }, { @@ -174,14 +185,22 @@ func TestListenersFromSnapshot(t *testing.T) { create: proxycfg.TestConfigSnapshotDiscoveryChainWithFailover, setup: func(snap *proxycfg.ConfigSnapshot) { for i := range snap.Proxy.Upstreams { + if snap.Proxy.Upstreams[i].DestinationName != "db" { + continue // only tweak the db upstream + } if snap.Proxy.Upstreams[i].Config == nil { snap.Proxy.Upstreams[i].Config = map[string]interface{}{} } + + uid := proxycfg.NewUpstreamID(&snap.Proxy.Upstreams[i]) + snap.Proxy.Upstreams[i].Config["envoy_listener_json"] = customListenerJSON(t, customListenerJSONOptions{ - Name: snap.Proxy.Upstreams[i].Identifier() + ":custom-upstream", + Name: uid.EnvoyID() + ":custom-upstream", }) } + + snap.ConnectProxy.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams) }, }, { @@ -901,7 +920,7 @@ func TestListenersFromSnapshot(t *testing.T) { connect.TestClusterID+".consul", nil, ) - snap.IngressGateway.DiscoveryChain["secure"] = secureChain + snap.IngressGateway.DiscoveryChain[UID("secure")] = secureChain insecureChain := discoverychain.TestCompileConfigEntries( t, @@ -912,7 +931,7 @@ func TestListenersFromSnapshot(t *testing.T) { connect.TestClusterID+".consul", nil, ) - snap.IngressGateway.DiscoveryChain["insecure"] = insecureChain + snap.IngressGateway.DiscoveryChain[UID("insecure")] = insecureChain snap.IngressGateway.Listeners = map[proxycfg.IngressListenerKey]structs.IngressListener{ {Protocol: "tcp", Port: 8080}: { @@ -1084,12 +1103,13 @@ func TestListenersFromSnapshot(t *testing.T) { // DiscoveryChain without an UpstreamConfig should yield a filter chain when in transparent proxy mode google := structs.NewServiceName("google", nil) - snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{ - google.String(): {}, + googleUID := proxycfg.NewUpstreamIDFromServiceName(google) + snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{ + googleUID: {}, } - snap.ConnectProxy.DiscoveryChain[google.String()] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[googleUID] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) - snap.ConnectProxy.WatchedUpstreamEndpoints[google.String()] = map[string]structs.CheckServiceNodes{ + snap.ConnectProxy.WatchedUpstreamEndpoints[googleUID] = map[string]structs.CheckServiceNodes{ "google.default.default.dc1": { structs.CheckServiceNode{ Node: &structs.Node{ @@ -1126,7 +1146,7 @@ func TestListenersFromSnapshot(t *testing.T) { } // DiscoveryChains without endpoints do not get a filter chain because there are no addresses to match on. - snap.ConnectProxy.DiscoveryChain["no-endpoints"] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[UID("no-endpoints")] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil) }, }, { @@ -1144,11 +1164,12 @@ func TestListenersFromSnapshot(t *testing.T) { // DiscoveryChain without an UpstreamConfig should yield a filter chain when in transparent proxy mode google := structs.NewServiceName("google", nil) - snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{ - google.String(): {}, + googleUID := proxycfg.NewUpstreamIDFromServiceName(google) + snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{ + googleUID: {}, } - snap.ConnectProxy.DiscoveryChain[google.String()] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) - snap.ConnectProxy.WatchedUpstreamEndpoints[google.String()] = map[string]structs.CheckServiceNodes{ + snap.ConnectProxy.DiscoveryChain[googleUID] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.WatchedUpstreamEndpoints[googleUID] = map[string]structs.CheckServiceNodes{ "google.default.default.dc1": { structs.CheckServiceNode{ Node: &structs.Node{ @@ -1168,7 +1189,7 @@ func TestListenersFromSnapshot(t *testing.T) { } // DiscoveryChains without endpoints do not get a filter chain because there are no addresses to match on. - snap.ConnectProxy.DiscoveryChain["no-endpoints"] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[UID("no-endpoints")] = discoverychain.TestCompileConfigEntries(t, "no-endpoints", "default", "default", "dc1", connect.TestClusterID+".consul", nil) }, }, { @@ -1178,24 +1199,26 @@ func TestListenersFromSnapshot(t *testing.T) { snap.Proxy.Mode = structs.ProxyModeTransparent kafka := structs.NewServiceName("kafka", nil) mongo := structs.NewServiceName("mongo", nil) + kafkaUID := proxycfg.NewUpstreamIDFromServiceName(kafka) + mongoUID := proxycfg.NewUpstreamIDFromServiceName(mongo) - snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{ - kafka.String(): {}, - mongo.String(): {}, + snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{ + kafkaUID: {}, + mongoUID: {}, } - snap.ConnectProxy.DiscoveryChain[mongo.String()] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil) - snap.ConnectProxy.DiscoveryChain[kafka.String()] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[mongoUID] = discoverychain.TestCompileConfigEntries(t, "mongo", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[kafkaUID] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil) // We add a filter chains for each passthrough service name. // The filter chain will route to a cluster with the same SNI name. - snap.ConnectProxy.PassthroughUpstreams = map[string]proxycfg.ServicePassthroughAddrs{ - kafka.String(): { + snap.ConnectProxy.PassthroughUpstreams = map[proxycfg.UpstreamID]proxycfg.ServicePassthroughAddrs{ + kafkaUID: { SNI: "kafka.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul", Addrs: map[string]struct{}{ "9.9.9.9": {}, }, }, - mongo.String(): { + mongoUID: { SNI: "mongo.default.dc1.internal.e5b08d03-bfc3-c870-1833-baddb116e648.consul", Addrs: map[string]struct{}{ "10.10.10.10": {}, @@ -1205,7 +1228,7 @@ func TestListenersFromSnapshot(t *testing.T) { } // There should still be a filter chain for mongo's virtual address - snap.ConnectProxy.WatchedUpstreamEndpoints[mongo.String()] = map[string]structs.CheckServiceNodes{ + snap.ConnectProxy.WatchedUpstreamEndpoints[mongoUID] = map[string]structs.CheckServiceNodes{ "mongo.default.default.dc1": { structs.CheckServiceNode{ Node: &structs.Node{ @@ -1240,12 +1263,14 @@ func TestListenersFromSnapshot(t *testing.T) { // DiscoveryChain without an UpstreamConfig should yield a filter chain when in transparent proxy mode google := structs.NewServiceName("google", nil) kafka := structs.NewServiceName("kafka", nil) - snap.ConnectProxy.IntentionUpstreams = map[string]struct{}{ - google.String(): {}, - kafka.String(): {}, + googleUID := proxycfg.NewUpstreamIDFromServiceName(google) + kafkaUID := proxycfg.NewUpstreamIDFromServiceName(kafka) + snap.ConnectProxy.IntentionUpstreams = map[proxycfg.UpstreamID]struct{}{ + googleUID: {}, + kafkaUID: {}, } - snap.ConnectProxy.DiscoveryChain[google.String()] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) - snap.ConnectProxy.DiscoveryChain[kafka.String()] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[googleUID] = discoverychain.TestCompileConfigEntries(t, "google", "default", "default", "dc1", connect.TestClusterID+".consul", nil) + snap.ConnectProxy.DiscoveryChain[kafkaUID] = discoverychain.TestCompileConfigEntries(t, "kafka", "default", "default", "dc1", connect.TestClusterID+".consul", nil) tgate := structs.CheckServiceNode{ Node: &structs.Node{ @@ -1264,10 +1289,10 @@ func TestListenersFromSnapshot(t *testing.T) { }, }, } - snap.ConnectProxy.WatchedUpstreamEndpoints[google.String()] = map[string]structs.CheckServiceNodes{ + snap.ConnectProxy.WatchedUpstreamEndpoints[googleUID] = map[string]structs.CheckServiceNodes{ "google.default.default.dc1": {tgate}, } - snap.ConnectProxy.WatchedUpstreamEndpoints[kafka.String()] = map[string]structs.CheckServiceNodes{ + snap.ConnectProxy.WatchedUpstreamEndpoints[kafkaUID] = map[string]structs.CheckServiceNodes{ "kafka.default.default.dc1": {tgate}, } }, diff --git a/agent/xds/routes.go b/agent/xds/routes.go index 7cb714daab..8d0b6330ee 100644 --- a/agent/xds/routes.go +++ b/agent/xds/routes.go @@ -48,24 +48,24 @@ func (s *ResourceGenerator) routesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) // "routes" in the snapshot. func (s *ResourceGenerator) routesForConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { var resources []proto.Message - for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain { + for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain { if chain.IsDefault() { continue } - explicit := cfgSnap.ConnectProxy.UpstreamConfig[id].HasLocalPortOrSocket() - if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit { + explicit := cfgSnap.ConnectProxy.UpstreamConfig[uid].HasLocalPortOrSocket() + if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[uid]; !implicit && !explicit { // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. continue } - virtualHost, err := makeUpstreamRouteForDiscoveryChain(id, chain, []string{"*"}) + virtualHost, err := makeUpstreamRouteForDiscoveryChain(uid.EnvoyID(), chain, []string{"*"}) if err != nil { return nil, err } route := &envoy_route_v3.RouteConfiguration{ - Name: id, + Name: uid.EnvoyID(), VirtualHosts: []*envoy_route_v3.VirtualHost{virtualHost}, // ValidateClusters defaults to true when defined statically and false // when done via RDS. Re-set the reasonable value of true to prevent @@ -173,7 +173,7 @@ func makeNamedDefaultRouteWithLB(clusterName string, lb *structs.LoadBalancer, a func (s *ResourceGenerator) routesForIngressGateway( listeners map[proxycfg.IngressListenerKey]structs.IngressListener, upstreams map[proxycfg.IngressListenerKey]structs.Upstreams, - chains map[string]*structs.CompiledDiscoveryChain, + chains map[proxycfg.UpstreamID]*structs.CompiledDiscoveryChain, ) ([]proto.Message, error) { var result []proto.Message @@ -195,14 +195,14 @@ func (s *ResourceGenerator) routesForIngressGateway( } for _, u := range upstreams { - upstreamID := u.Identifier() - chain := chains[upstreamID] + uid := proxycfg.NewUpstreamID(&u) + chain := chains[uid] if chain == nil { continue } domains := generateUpstreamIngressDomains(listenerKey, u) - virtualHost, err := makeUpstreamRouteForDiscoveryChain(upstreamID, chain, domains) + virtualHost, err := makeUpstreamRouteForDiscoveryChain(uid.EnvoyID(), chain, domains) if err != nil { return nil, err } diff --git a/agent/xds/routes_test.go b/agent/xds/routes_test.go index cd76184dbc..d0c1b44d9b 100644 --- a/agent/xds/routes_test.go +++ b/agent/xds/routes_test.go @@ -204,11 +204,11 @@ func TestRoutesFromSnapshot(t *testing.T) { bazChain := discoverychain.TestCompileConfigEntries(t, "baz", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...) quxChain := discoverychain.TestCompileConfigEntries(t, "qux", "default", "default", "dc1", connect.TestClusterID+".consul", nil, entries...) - snap.IngressGateway.DiscoveryChain = map[string]*structs.CompiledDiscoveryChain{ - "foo": fooChain, - "bar": barChain, - "baz": bazChain, - "qux": quxChain, + snap.IngressGateway.DiscoveryChain = map[proxycfg.UpstreamID]*structs.CompiledDiscoveryChain{ + UID("foo"): fooChain, + UID("bar"): barChain, + UID("baz"): bazChain, + UID("qux"): quxChain, } }, }, @@ -667,6 +667,9 @@ func setupIngressWithTwoHTTPServices(t *testing.T, o ingressSDSOpts) func(snap * }, } + webUID := proxycfg.NewUpstreamID(&webUpstream) + fooUID := proxycfg.NewUpstreamID(&fooUpstream) + // Setup additional HTTP service on same listener with default router snap.IngressGateway.Upstreams = map[proxycfg.IngressListenerKey]structs.Upstreams{ {Protocol: "http", Port: 9191}: {webUpstream, fooUpstream}, @@ -775,7 +778,7 @@ func setupIngressWithTwoHTTPServices(t *testing.T, o ingressSDSOpts) func(snap * o.entMetas["web"].PartitionOrDefault(), "dc1", connect.TestClusterID+".consul", nil, entries...) - snap.IngressGateway.DiscoveryChain[webUpstream.Identifier()] = webChain - snap.IngressGateway.DiscoveryChain[fooUpstream.Identifier()] = fooChain + snap.IngressGateway.DiscoveryChain[webUID] = webChain + snap.IngressGateway.DiscoveryChain[fooUID] = fooChain } } diff --git a/agent/xds/testdata/listeners/custom-upstream.envoy-1-20-x.golden b/agent/xds/testdata/listeners/custom-upstream.envoy-1-20-x.golden index 62e02591d9..d12bdaf998 100644 --- a/agent/xds/testdata/listeners/custom-upstream.envoy-1-20-x.golden +++ b/agent/xds/testdata/listeners/custom-upstream.envoy-1-20-x.golden @@ -27,11 +27,11 @@ }, { "@type": "type.googleapis.com/envoy.config.listener.v3.Listener", - "name": "prepared_query:geo-cache:custom-upstream", + "name": "prepared_query:geo-cache:127.10.10.10:8181", "address": { "socketAddress": { - "address": "11.11.11.11", - "portValue": 11111 + "address": "127.10.10.10", + "portValue": 8181 } }, "filterChains": [ @@ -41,13 +41,14 @@ "name": "envoy.filters.network.tcp_proxy", "typedConfig": { "@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy", - "statPrefix": "foo-stats", - "cluster": "random-cluster" + "statPrefix": "upstream.prepared_query_geo-cache", + "cluster": "geo-cache.default.dc1.query.11111111-2222-3333-4444-555555555555.consul" } } ] } - ] + ], + "trafficDirection": "OUTBOUND" }, { "@type": "type.googleapis.com/envoy.config.listener.v3.Listener", diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index dc58e1f9d9..c0be25680b 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -43,8 +43,8 @@ func newTestSnapshot( additionalEntries ...structs.ConfigEntry, ) *proxycfg.ConfigSnapshot { snap := proxycfg.TestConfigSnapshotDiscoveryChainDefaultWithEntries(t, additionalEntries...) - snap.ConnectProxy.PreparedQueryEndpoints = map[string]structs.CheckServiceNodes{ - "prepared_query:geo-cache": proxycfg.TestPreparedQueryNodes(t, "geo-cache"), + snap.ConnectProxy.PreparedQueryEndpoints = map[proxycfg.UpstreamID]structs.CheckServiceNodes{ + UID("prepared_query:geo-cache"): proxycfg.TestPreparedQueryNodes(t, "geo-cache"), } if prevSnap != nil { snap.Roots = prevSnap.Roots @@ -53,7 +53,7 @@ func newTestSnapshot( if dbServiceProtocol != "" { // Simulate ServiceManager injection of protocol snap.Proxy.Upstreams[0].Config["protocol"] = dbServiceProtocol - snap.ConnectProxy.ConfigSnapshotUpstreams.UpstreamConfig = snap.Proxy.Upstreams.ToMap() + snap.ConnectProxy.ConfigSnapshotUpstreams.UpstreamConfig = proxycfg.UpstreamsToMap(snap.Proxy.Upstreams) } return snap }