diff --git a/agent/agent.go b/agent/agent.go index 3c0609a98a..765c1ab918 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -4113,6 +4113,8 @@ func (a *Agent) registerCache() { a.cache.RegisterType(cachetype.TrustBundleListName, &cachetype.TrustBundles{Client: a.rpcClientPeering}) + a.cache.RegisterType(cachetype.PeeredUpstreamsName, &cachetype.PeeredUpstreams{RPC: a}) + a.registerEntCache() } diff --git a/agent/proxycfg/connect_proxy.go b/agent/proxycfg/connect_proxy.go index ab86560bd5..3221150dba 100644 --- a/agent/proxycfg/connect_proxy.go +++ b/agent/proxycfg/connect_proxy.go @@ -6,6 +6,7 @@ import ( "strings" cachetype "github.com/hashicorp/consul/agent/cache-types" + "github.com/hashicorp/consul/agent/proxycfg/internal/watch" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbpeering" ) @@ -22,8 +23,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e snap.ConnectProxy.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc) snap.ConnectProxy.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc) snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) - snap.ConnectProxy.WatchedUpstreamPeerTrustBundles = make(map[string]context.CancelFunc) - snap.ConnectProxy.UpstreamPeerTrustBundles = make(map[string]*pbpeering.PeeringTrustBundle) + snap.ConnectProxy.UpstreamPeerTrustBundles = watch.NewMap[string, *pbpeering.PeeringTrustBundle]() snap.ConnectProxy.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc) snap.ConnectProxy.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes) snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType) @@ -31,7 +31,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream) snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]map[string]map[string]struct{}) snap.ConnectProxy.PassthroughIndices = make(map[string]indexedTarget) - snap.ConnectProxy.PeerUpstreamEndpoints = make(map[UpstreamID]structs.CheckServiceNodes) + snap.ConnectProxy.PeerUpstreamEndpoints = watch.NewMap[UpstreamID, structs.CheckServiceNodes]() snap.ConnectProxy.PeerUpstreamEndpointsUseHostnames = make(map[UpstreamID]struct{}) // Watch for root changes @@ -108,6 +108,14 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e if err != nil { return snap, err } + err = s.dataSources.PeeredUpstreams.Notify(ctx, &structs.PartitionSpecificRequest{ + QueryOptions: structs.QueryOptions{Token: s.token}, + Datacenter: s.source.Datacenter, + EnterpriseMeta: s.proxyID.EnterpriseMeta, + }, peeredUpstreamsID, s.ch) + if err != nil { + return snap, err + } } // Watch for updates to service endpoints for all upstreams @@ -134,7 +142,8 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e dc = u.Datacenter } if s.proxyCfg.Mode == structs.ProxyModeTransparent && (dc == "" || dc == s.source.Datacenter) { - // In transparent proxy mode, watches for upstreams in the local DC are handled by the IntentionUpstreams watch. + // In transparent proxy mode, watches for upstreams in the local DC + // are handled by the IntentionUpstreams and PeeredUpstreams watch. continue } @@ -183,8 +192,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e s.logger.Trace("initializing watch of peered upstream", "upstream", uid) - // TODO(peering): We'll need to track a CancelFunc for this - // once the tproxy support lands. + snap.ConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, nil) err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{ PeerName: uid.Peer, Datacenter: dc, @@ -204,7 +212,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e } // Check whether a watch for this peer exists to avoid duplicates. - if _, ok := snap.ConnectProxy.WatchedUpstreamPeerTrustBundles[uid.Peer]; !ok { + if _, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get(uid.Peer); !ok { peerCtx, cancel := context.WithCancel(ctx) if err := s.dataSources.TrustBundle.Notify(peerCtx, &pbpeering.TrustBundleReadRequest{ Name: uid.Peer, @@ -214,7 +222,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e return snap, fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err) } - snap.ConnectProxy.WatchedUpstreamPeerTrustBundles[uid.Peer] = cancel + snap.ConnectProxy.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel) } continue } @@ -262,7 +270,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s } peer := strings.TrimPrefix(u.CorrelationID, peerTrustBundleIDPrefix) if resp.Bundle != nil { - snap.ConnectProxy.UpstreamPeerTrustBundles[peer] = resp.Bundle + snap.ConnectProxy.UpstreamPeerTrustBundles.Set(peer, resp.Bundle) } case u.CorrelationID == peeringTrustBundlesWatchID: @@ -283,6 +291,90 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s snap.ConnectProxy.Intentions = resp snap.ConnectProxy.IntentionsSet = true + case u.CorrelationID == peeredUpstreamsID: + resp, ok := u.Result.(*structs.IndexedPeeredServiceList) + if !ok { + return fmt.Errorf("invalid type for response %T", u.Result) + } + + seenUpstreams := make(map[UpstreamID]struct{}) + for _, psn := range resp.Services { + uid := NewUpstreamIDFromPeeredServiceName(psn) + + if _, ok := seenUpstreams[uid]; ok { + continue + } + seenUpstreams[uid] = struct{}{} + + s.logger.Trace("initializing watch of peered upstream", "upstream", uid) + + hctx, hcancel := context.WithCancel(ctx) + err := s.dataSources.Health.Notify(hctx, &structs.ServiceSpecificRequest{ + PeerName: uid.Peer, + Datacenter: s.source.Datacenter, + QueryOptions: structs.QueryOptions{ + Token: s.token, + }, + ServiceName: psn.ServiceName.Name, + Connect: true, + // Note that Identifier doesn't type-prefix for service any more as it's + // the default and makes metrics and other things much cleaner. It's + // simpler for us if we have the type to make things unambiguous. + Source: *s.source, + EnterpriseMeta: uid.EnterpriseMeta, + }, upstreamPeerWatchIDPrefix+uid.String(), s.ch) + if err != nil { + hcancel() + return fmt.Errorf("failed to watch health for %s: %v", uid, err) + } + snap.ConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, hcancel) + + // Check whether a watch for this peer exists to avoid duplicates. + if _, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get(uid.Peer); !ok { + peerCtx, cancel := context.WithCancel(ctx) + if err := s.dataSources.TrustBundle.Notify(peerCtx, &pbpeering.TrustBundleReadRequest{ + Name: uid.Peer, + Partition: uid.PartitionOrDefault(), + }, peerTrustBundleIDPrefix+uid.Peer, s.ch); err != nil { + cancel() + return fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err) + } + + snap.ConnectProxy.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel) + } + } + snap.ConnectProxy.PeeredUpstreams = seenUpstreams + + // + // Clean up data + // + + validPeerNames := make(map[string]struct{}) + + // Iterate through all known endpoints and remove references to upstream IDs that weren't in the update + snap.ConnectProxy.PeerUpstreamEndpoints.ForEachKey(func(uid UpstreamID) bool { + // Peered upstream is explicitly defined in upstream config + if _, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok { + validPeerNames[uid.Peer] = struct{}{} + return true + } + // Peered upstream came from dynamic source of imported services + if _, ok := seenUpstreams[uid]; ok { + validPeerNames[uid.Peer] = struct{}{} + return true + } + snap.ConnectProxy.PeerUpstreamEndpoints.CancelWatch(uid) + return true + }) + + // Iterate through all known trust bundles and remove references to any unseen peer names + snap.ConnectProxy.UpstreamPeerTrustBundles.ForEachKey(func(peerName PeerName) bool { + if _, ok := validPeerNames[peerName]; !ok { + snap.ConnectProxy.UpstreamPeerTrustBundles.CancelWatch(peerName) + } + return true + }) + case u.CorrelationID == intentionUpstreamsID: resp, ok := u.Result.(*structs.IndexedServiceList) if !ok { diff --git a/agent/proxycfg/data_sources.go b/agent/proxycfg/data_sources.go index 73c8b06375..310a4340ea 100644 --- a/agent/proxycfg/data_sources.go +++ b/agent/proxycfg/data_sources.go @@ -69,6 +69,8 @@ type DataSources struct { // notification channel. LeafCertificate LeafCertificate + // PeeredUpstreams provides imported-service upstream updates on a + // notification channel. PeeredUpstreams PeeredUpstreams // PreparedQuery provides updates about the results of a prepared query. diff --git a/agent/proxycfg/manager_test.go b/agent/proxycfg/manager_test.go index 12d8c79194..184b62148f 100644 --- a/agent/proxycfg/manager_test.go +++ b/agent/proxycfg/manager_test.go @@ -11,6 +11,7 @@ import ( cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/discoverychain" + "github.com/hashicorp/consul/agent/proxycfg/internal/watch" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto/pbpeering" @@ -230,8 +231,8 @@ func TestManager_BasicLifecycle(t *testing.T) { }, PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, PassthroughIndices: map[string]indexedTarget{}, - UpstreamPeerTrustBundles: map[string]*pbpeering.PeeringTrustBundle{}, - PeerUpstreamEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, + UpstreamPeerTrustBundles: watch.NewMap[PeerName, *pbpeering.PeeringTrustBundle](), + PeerUpstreamEndpoints: watch.NewMap[UpstreamID, structs.CheckServiceNodes](), PeerUpstreamEndpointsUseHostnames: map[UpstreamID]struct{}{}, }, PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, @@ -291,8 +292,8 @@ func TestManager_BasicLifecycle(t *testing.T) { }, PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{}, PassthroughIndices: map[string]indexedTarget{}, - UpstreamPeerTrustBundles: map[string]*pbpeering.PeeringTrustBundle{}, - PeerUpstreamEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, + UpstreamPeerTrustBundles: watch.NewMap[PeerName, *pbpeering.PeeringTrustBundle](), + PeerUpstreamEndpoints: watch.NewMap[UpstreamID, structs.CheckServiceNodes](), PeerUpstreamEndpointsUseHostnames: map[UpstreamID]struct{}{}, }, PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{}, diff --git a/agent/proxycfg/naming.go b/agent/proxycfg/naming.go index 7b80d49a9d..3bb0854b04 100644 --- a/agent/proxycfg/naming.go +++ b/agent/proxycfg/naming.go @@ -7,6 +7,8 @@ import ( "github.com/hashicorp/consul/agent/structs" ) +type PeerName = string + type UpstreamID struct { Type string Name string @@ -32,7 +34,16 @@ func NewUpstreamID(u *structs.Upstream) UpstreamID { return id } -// TODO(peering): confirm we don't need peername here +func NewUpstreamIDFromPeeredServiceName(psn structs.PeeredServiceName) UpstreamID { + id := UpstreamID{ + Name: psn.ServiceName.Name, + EnterpriseMeta: psn.ServiceName.EnterpriseMeta, + Peer: psn.Peer, + } + id.normalize() + return id +} + func NewUpstreamIDFromServiceName(sn structs.ServiceName) UpstreamID { id := UpstreamID{ Name: sn.Name, diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index c52a99a811..3835c2a01b 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -9,6 +9,7 @@ import ( "github.com/mitchellh/copystructure" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/proxycfg/internal/watch" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/proto/pbpeering" @@ -44,13 +45,9 @@ type ConfigSnapshotUpstreams struct { // endpoints of an upstream. WatchedUpstreamEndpoints map[UpstreamID]map[string]structs.CheckServiceNodes - // WatchedUpstreamPeerTrustBundles is a map of (PeerName -> CancelFunc) in order to cancel - // watches for peer trust bundles any time the list of upstream peers changes. - WatchedUpstreamPeerTrustBundles map[string]context.CancelFunc - // UpstreamPeerTrustBundles is a map of (PeerName -> PeeringTrustBundle). // It is used to store trust bundles for upstream TLS transport sockets. - UpstreamPeerTrustBundles map[string]*pbpeering.PeeringTrustBundle + UpstreamPeerTrustBundles watch.Map[PeerName, *pbpeering.PeeringTrustBundle] // WatchedGateways is a map of UpstreamID -> (map of GatewayKey.String() -> // CancelFunc) in order to cancel watches for mesh gateways @@ -80,10 +77,16 @@ type ConfigSnapshotUpstreams struct { // This list only applies to proxies registered in 'transparent' mode. IntentionUpstreams map[UpstreamID]struct{} + // PeeredUpstreams is a set of all upstream targets in a local partition. + // + // This list only applies to proxies registered in 'transparent' mode. + PeeredUpstreams map[UpstreamID]struct{} + // PeerUpstreamEndpoints is a map of UpstreamID -> (set of IP addresses) // and used to determine the backing endpoints of an upstream in another // peer. - PeerUpstreamEndpoints map[UpstreamID]structs.CheckServiceNodes + PeerUpstreamEndpoints watch.Map[UpstreamID, structs.CheckServiceNodes] + PeerUpstreamEndpointsUseHostnames map[UpstreamID]struct{} } @@ -152,8 +155,7 @@ func (c *configSnapshotConnectProxy) isEmpty() bool { len(c.WatchedDiscoveryChains) == 0 && len(c.WatchedUpstreams) == 0 && len(c.WatchedUpstreamEndpoints) == 0 && - len(c.WatchedUpstreamPeerTrustBundles) == 0 && - len(c.UpstreamPeerTrustBundles) == 0 && + c.UpstreamPeerTrustBundles.Len() == 0 && len(c.WatchedGateways) == 0 && len(c.WatchedGatewayEndpoints) == 0 && len(c.WatchedServiceChecks) == 0 && @@ -161,9 +163,10 @@ func (c *configSnapshotConnectProxy) isEmpty() bool { len(c.UpstreamConfig) == 0 && len(c.PassthroughUpstreams) == 0 && len(c.IntentionUpstreams) == 0 && + len(c.PeeredUpstreams) == 0 && !c.InboundPeerTrustBundlesSet && !c.MeshConfigSet && - len(c.PeerUpstreamEndpoints) == 0 && + c.PeerUpstreamEndpoints.Len() == 0 && len(c.PeerUpstreamEndpointsUseHostnames) == 0 } @@ -715,7 +718,6 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) { snap.ConnectProxy.WatchedUpstreams = nil snap.ConnectProxy.WatchedGateways = nil snap.ConnectProxy.WatchedDiscoveryChains = nil - snap.ConnectProxy.WatchedUpstreamPeerTrustBundles = nil case structs.ServiceKindTerminatingGateway: snap.TerminatingGateway.WatchedServices = nil snap.TerminatingGateway.WatchedIntentions = nil @@ -730,7 +732,6 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) { snap.IngressGateway.WatchedUpstreams = nil snap.IngressGateway.WatchedGateways = nil snap.IngressGateway.WatchedDiscoveryChains = nil - snap.IngressGateway.WatchedUpstreamPeerTrustBundles = nil // only ingress-gateway snap.IngressGateway.LeafCertWatchCancel = nil } @@ -803,7 +804,7 @@ func (s *ConfigSnapshot) MeshConfigTLSOutgoing() *structs.MeshDirectionalTLSConf } func (u *ConfigSnapshotUpstreams) UpstreamPeerMeta(uid UpstreamID) structs.PeeringServiceMeta { - nodes := u.PeerUpstreamEndpoints[uid] + nodes, _ := u.PeerUpstreamEndpoints.Get(uid) if len(nodes) == 0 { return structs.PeeringServiceMeta{} } @@ -833,7 +834,7 @@ func (u *ConfigSnapshotUpstreams) PeeredUpstreamIDs() []UpstreamID { continue } - if _, ok := u.UpstreamPeerTrustBundles[uid.Peer]; uid.Peer != "" && !ok { + if _, ok := u.UpstreamPeerTrustBundles.Get(uid.Peer); !ok { // The trust bundle for this upstream is not available yet, skip for now. continue } diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 9fc5c88f66..f9388cf487 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -36,6 +36,7 @@ const ( serviceResolverIDPrefix = "service-resolver:" serviceIntentionsIDPrefix = "service-intentions:" intentionUpstreamsID = "intention-upstreams" + peeredUpstreamsID = "peered-upstreams" upstreamPeerWatchIDPrefix = "upstream-peer:" exportedServiceListWatchID = "exported-service-list" meshConfigEntryID = "mesh" diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index 3a54d0314f..36b641a691 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -131,6 +131,7 @@ func recordWatches(sc *stateConfig) *watchRecorder { IntentionUpstreams: typedWatchRecorder[*structs.ServiceSpecificRequest]{wr}, InternalServiceDump: typedWatchRecorder[*structs.ServiceDumpRequest]{wr}, LeafCertificate: typedWatchRecorder[*cachetype.ConnectCALeafRequest]{wr}, + PeeredUpstreams: typedWatchRecorder[*structs.PartitionSpecificRequest]{wr}, PreparedQuery: typedWatchRecorder[*structs.PreparedQueryExecuteRequest]{wr}, ResolvedServiceConfig: typedWatchRecorder[*structs.ServiceConfigRequest]{wr}, ServiceList: typedWatchRecorder[*structs.DCSpecificRequest]{wr}, @@ -321,6 +322,15 @@ func genVerifyServiceSpecificPeeredRequest(expectedService, expectedFilter, expe } } +func genVerifyPartitionSpecificRequest(expectedPartition, expectedDatacenter string) verifyWatchRequest { + return func(t testing.TB, request any) { + reqReal, ok := request.(*structs.PartitionSpecificRequest) + require.True(t, ok) + require.Equal(t, expectedDatacenter, reqReal.Datacenter) + require.Equal(t, expectedPartition, reqReal.PartitionOrDefault()) + } +} + func genVerifyGatewayServiceWatch(expectedService, expectedDatacenter string) verifyWatchRequest { return genVerifyServiceSpecificRequest(expectedService, "", expectedDatacenter, false) } @@ -404,9 +414,11 @@ func TestState_WatchesAndUpdates(t *testing.T) { dbUID = NewUpstreamIDFromServiceName(db) pqUID = UpstreamIDFromString("prepared_query:query") extApiUID = NewUpstreamIDFromServiceName(apiA) + extDBUID = NewUpstreamIDFromServiceName(db) ) // TODO(peering): NewUpstreamIDFromServiceName should take a PeerName extApiUID.Peer = "peer-a" + extDBUID.Peer = "peer-a" const peerTrustDomain = "1c053652-8512-4373-90cf-5a7f6263a994.consul" @@ -2509,6 +2521,253 @@ func TestState_WatchesAndUpdates(t *testing.T) { }, }, }, + "transparent-proxy-initial-with-peers": { + ns: structs.NodeService{ + Kind: structs.ServiceKindConnectProxy, + ID: "api-proxy", + Service: "api-proxy", + Address: "10.0.1.1", + Proxy: structs.ConnectProxyConfig{ + DestinationServiceName: "api", + Mode: structs.ProxyModeTransparent, + Upstreams: structs.Upstreams{ + { + DestinationName: "api-a", + DestinationPeer: "peer-a", + }, + }, + }, + }, + sourceDC: "dc1", + stages: []verificationStage{ + { + requiredWatches: map[string]verifyWatchRequest{ + peeringTrustBundlesWatchID: genVerifyTrustBundleListWatch("api"), + peeredUpstreamsID: genVerifyPartitionSpecificRequest(acl.DefaultEnterpriseMeta().PartitionOrDefault(), "dc1"), + meshConfigEntryID: genVerifyMeshConfigWatch("dc1"), + rootsWatchID: genVerifyDCSpecificWatch("dc1"), + leafWatchID: genVerifyLeafWatch("api", "dc1"), + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid") + require.True(t, snap.MeshGateway.isEmpty()) + require.True(t, snap.IngressGateway.isEmpty()) + require.True(t, snap.TerminatingGateway.isEmpty()) + + require.False(t, snap.ConnectProxy.isEmpty()) + + // This is explicitly defined from proxy config + expectUpstreams := map[UpstreamID]*structs.Upstream{ + extApiUID: { + DestinationName: "api-a", + DestinationNamespace: structs.IntentionDefaultNamespace, + DestinationPartition: structs.IntentionDefaultNamespace, + DestinationPeer: "peer-a", + }, + } + require.Equal(t, expectUpstreams, snap.ConnectProxy.UpstreamConfig) + }, + }, + { + // Initial events + events: []UpdateEvent{ + rootWatchEvent(), + { + CorrelationID: leafWatchID, + Result: issuedCert, + Err: nil, + }, + { + CorrelationID: intentionsWatchID, + Result: TestIntentions(), + Err: nil, + }, + { + CorrelationID: peeringTrustBundlesWatchID, + Result: peerTrustBundles, + }, + { + CorrelationID: peeredUpstreamsID, + Result: &structs.IndexedPeeredServiceList{ + Services: []structs.PeeredServiceName{ + { + ServiceName: apiA, + Peer: "peer-a", + }, + { + // This service is dynamic (not from static config) + ServiceName: db, + Peer: "peer-a", + }, + }, + }, + }, + { + CorrelationID: meshConfigEntryID, + Result: &structs.ConfigEntryResponse{ + Entry: nil, // no explicit config + }, + Err: nil, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid") + require.Equal(t, indexedRoots, snap.Roots) + require.Equal(t, issuedCert, snap.Leaf()) + require.True(t, snap.MeshGateway.isEmpty()) + require.True(t, snap.IngressGateway.isEmpty()) + require.True(t, snap.TerminatingGateway.isEmpty()) + require.True(t, snap.ConnectProxy.MeshConfigSet) + require.Nil(t, snap.ConnectProxy.MeshConfig) + + // Check PeeredUpstream is populated + expect := map[UpstreamID]struct{}{ + extDBUID: {}, + extApiUID: {}, + } + require.Equal(t, expect, snap.ConnectProxy.PeeredUpstreams) + + require.True(t, snap.ConnectProxy.PeerUpstreamEndpoints.IsWatched(extApiUID)) + _, ok := snap.ConnectProxy.PeerUpstreamEndpoints.Get(extApiUID) + require.False(t, ok, "expected initialized but empty PeerUpstreamEndpoint") + + require.True(t, snap.ConnectProxy.PeerUpstreamEndpoints.IsWatched(extDBUID)) + _, ok = snap.ConnectProxy.PeerUpstreamEndpoints.Get(extDBUID) + require.False(t, ok, "expected initialized but empty PeerUpstreamEndpoint") + + require.True(t, snap.ConnectProxy.UpstreamPeerTrustBundles.IsWatched("peer-a")) + _, ok = snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a") + require.False(t, ok, "expected initialized but empty PeerTrustBundle") + }, + }, + { + // Peered upstream will have set up 3 more watches + requiredWatches: map[string]verifyWatchRequest{ + upstreamPeerWatchIDPrefix + extApiUID.String(): genVerifyServiceSpecificPeeredRequest("api-a", "", "dc1", "peer-a", true), + upstreamPeerWatchIDPrefix + extDBUID.String(): genVerifyServiceSpecificPeeredRequest("db", "", "dc1", "peer-a", true), + peerTrustBundleIDPrefix + "peer-a": genVerifyTrustBundleReadWatch("peer-a"), + }, + events: []UpdateEvent{ + { + CorrelationID: peerTrustBundleIDPrefix + "peer-a", + Result: &pbpeering.TrustBundleReadResponse{ + Bundle: peerTrustBundles.Bundles[0], + }, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid") + require.Equal(t, indexedRoots, snap.Roots) + require.Equal(t, issuedCert, snap.Leaf()) + require.True(t, snap.MeshGateway.isEmpty()) + require.True(t, snap.IngressGateway.isEmpty()) + require.True(t, snap.TerminatingGateway.isEmpty()) + require.True(t, snap.ConnectProxy.MeshConfigSet) + require.Nil(t, snap.ConnectProxy.MeshConfig) + + // Check PeeredUpstream is populated + expect := map[UpstreamID]struct{}{ + extDBUID: {}, + extApiUID: {}, + } + require.Equal(t, expect, snap.ConnectProxy.PeeredUpstreams) + + // Expect two entries (DB and api-a) + require.Equal(t, 2, snap.ConnectProxy.PeerUpstreamEndpoints.Len()) + + // db does not have endpoints yet + ep, _ := snap.ConnectProxy.PeerUpstreamEndpoints.Get(extDBUID) + require.Nil(t, ep) + + // Expect a trust bundle + ptb, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a") + require.True(t, ok) + prototest.AssertDeepEqual(t, peerTrustBundles.Bundles[0], ptb) + + // Sanity check that local upstream maps are not populated + require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints[extDBUID]) + require.Empty(t, snap.ConnectProxy.PassthroughUpstreams[extDBUID]) + require.Empty(t, snap.ConnectProxy.PassthroughIndices) + }, + }, + { + // Add another instance of "api-a" service + events: []UpdateEvent{ + { + CorrelationID: upstreamPeerWatchIDPrefix + extDBUID.String(), + Result: &structs.IndexedCheckServiceNodes{ + Nodes: structs.CheckServiceNodes{ + { + Node: &structs.Node{ + Node: "node1", + Address: "127.0.0.1", + PeerName: "peer-a", + }, + Service: &structs.NodeService{ + ID: "db", + Service: "db", + PeerName: "peer-a", + Connect: structs.ServiceConnect{}, + }, + }, + }, + }, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid") + + // Check PeeredUpstream is populated + expect := map[UpstreamID]struct{}{ + extApiUID: {}, + extDBUID: {}, + } + require.Equal(t, expect, snap.ConnectProxy.PeeredUpstreams) + + // Expect two entries (api-a, db) + require.Equal(t, 2, snap.ConnectProxy.PeerUpstreamEndpoints.Len()) + + // db has an endpoint now + ep, _ := snap.ConnectProxy.PeerUpstreamEndpoints.Get(extDBUID) + require.NotNil(t, ep) + require.Len(t, ep, 1) + + // Expect a trust bundle + ptb, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a") + require.True(t, ok) + prototest.AssertDeepEqual(t, peerTrustBundles.Bundles[0], ptb) + + // Sanity check that local upstream maps are not populated + require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints[extDBUID]) + require.Empty(t, snap.ConnectProxy.PassthroughUpstreams[extDBUID]) + require.Empty(t, snap.ConnectProxy.PassthroughIndices) + }, + }, + { + // Empty list of peered upstreams should clean up map keys + events: []UpdateEvent{ + { + CorrelationID: peeredUpstreamsID, + Result: &structs.IndexedPeeredServiceList{ + Services: []structs.PeeredServiceName{}, + }, + }, + }, + verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) { + require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid") + + require.Empty(t, snap.ConnectProxy.PeeredUpstreams) + + // db endpoint should have been cleaned up + require.False(t, snap.ConnectProxy.PeerUpstreamEndpoints.IsWatched(extDBUID)) + + // Expect only api-a endpoint + require.Equal(t, 1, snap.ConnectProxy.PeerUpstreamEndpoints.Len()) + require.Equal(t, 1, snap.ConnectProxy.UpstreamPeerTrustBundles.Len()) + }, + }, + }, + }, "connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault), "connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal), "connect-proxy-with-peers": { @@ -2564,10 +2823,15 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.ConnectProxy.WatchedGateways, 0, "%+v", snap.ConnectProxy.WatchedGateways) require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 0, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints) - require.Contains(t, snap.ConnectProxy.WatchedUpstreamPeerTrustBundles, "peer-a", "%+v", snap.ConnectProxy.WatchedUpstreamPeerTrustBundles) - require.Len(t, snap.ConnectProxy.UpstreamPeerTrustBundles, 0, "%+v", snap.ConnectProxy.UpstreamPeerTrustBundles) + // watch initialized + require.True(t, snap.ConnectProxy.UpstreamPeerTrustBundles.IsWatched("peer-a")) + _, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a") + require.False(t, ok) // but no data - require.Len(t, snap.ConnectProxy.PeerUpstreamEndpoints, 0, "%+v", snap.ConnectProxy.PeerUpstreamEndpoints) + // watch initialized + require.True(t, snap.ConnectProxy.PeerUpstreamEndpoints.IsWatched(extApiUID)) + _, ok = snap.ConnectProxy.PeerUpstreamEndpoints.Get(extApiUID) + require.False(t, ok) // but no data require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks) require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints) @@ -2655,11 +2919,13 @@ func TestState_WatchesAndUpdates(t *testing.T) { require.Len(t, snap.ConnectProxy.WatchedGateways, 1, "%+v", snap.ConnectProxy.WatchedGateways) require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 1, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints) - require.Contains(t, snap.ConnectProxy.WatchedUpstreamPeerTrustBundles, "peer-a", "%+v", snap.ConnectProxy.WatchedUpstreamPeerTrustBundles) - require.Equal(t, peerTrustBundles.Bundles[0], snap.ConnectProxy.UpstreamPeerTrustBundles["peer-a"], "%+v", snap.ConnectProxy.WatchedUpstreamPeerTrustBundles) + tb, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a") + require.True(t, ok) + prototest.AssertDeepEqual(t, peerTrustBundles.Bundles[0], tb) - require.Len(t, snap.ConnectProxy.PeerUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.PeerUpstreamEndpoints) - require.NotNil(t, snap.ConnectProxy.PeerUpstreamEndpoints[extApiUID]) + require.Equal(t, 1, snap.ConnectProxy.PeerUpstreamEndpoints.Len()) + ep, _ := snap.ConnectProxy.PeerUpstreamEndpoints.Get(extApiUID) + require.NotNil(t, ep) require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks) require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints) diff --git a/agent/proxycfg/testing.go b/agent/proxycfg/testing.go index c30435e374..744c17e182 100644 --- a/agent/proxycfg/testing.go +++ b/agent/proxycfg/testing.go @@ -745,6 +745,7 @@ func testConfigSnapshotFixture( IntentionUpstreams: &noopDataSource[*structs.ServiceSpecificRequest]{}, InternalServiceDump: &noopDataSource[*structs.ServiceDumpRequest]{}, LeafCertificate: &noopDataSource[*cachetype.ConnectCALeafRequest]{}, + PeeredUpstreams: &noopDataSource[*structs.PartitionSpecificRequest]{}, PreparedQuery: &noopDataSource[*structs.PreparedQueryExecuteRequest]{}, ResolvedServiceConfig: &noopDataSource[*structs.ServiceConfigRequest]{}, ServiceList: &noopDataSource[*structs.DCSpecificRequest]{}, @@ -971,6 +972,7 @@ type TestDataSources struct { IntentionUpstreams *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList] InternalServiceDump *TestDataSource[*structs.ServiceDumpRequest, *structs.IndexedNodesWithGateways] LeafCertificate *TestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert] + PeeredUpstreams *TestDataSource[*structs.PartitionSpecificRequest, *structs.IndexedPeeredServiceList] PreparedQuery *TestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse] ResolvedServiceConfig *TestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse] ServiceList *TestDataSource[*structs.DCSpecificRequest, *structs.IndexedServiceList] @@ -994,6 +996,7 @@ func (t *TestDataSources) ToDataSources() DataSources { IntentionUpstreams: t.IntentionUpstreams, InternalServiceDump: t.InternalServiceDump, LeafCertificate: t.LeafCertificate, + PeeredUpstreams: t.PeeredUpstreams, PreparedQuery: t.PreparedQuery, ResolvedServiceConfig: t.ResolvedServiceConfig, ServiceList: t.ServiceList, diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index 1f1251c42c..4f41d7908e 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -103,19 +103,15 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv resp.Nodes, ) if len(filteredNodes) > 0 { - upstreamsSnapshot.PeerUpstreamEndpoints[uid] = filteredNodes - upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[uid] = struct{}{} + if set := upstreamsSnapshot.PeerUpstreamEndpoints.Set(uid, filteredNodes); set { + upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[uid] = struct{}{} + } } else { - upstreamsSnapshot.PeerUpstreamEndpoints[uid] = resp.Nodes - delete(upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames, uid) + if set := upstreamsSnapshot.PeerUpstreamEndpoints.Set(uid, resp.Nodes); set { + delete(upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames, uid) + } } - if s.kind != structs.ServiceKindConnectProxy || s.proxyCfg.Mode != structs.ProxyModeTransparent { - return nil - } - - s.logger.Warn("skipping transparent proxy update for peered upstream") - case strings.HasPrefix(u.CorrelationID, "upstream-target:"): resp, ok := u.Result.(*structs.IndexedCheckServiceNodes) if !ok { @@ -157,6 +153,10 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv // Make sure to use an external address when crossing partition or DC boundaries. isRemote := !snap.Locality.Matches(node.Node.Datacenter, node.Node.PartitionOrDefault()) + // If node is peered it must be remote + if node.Node.PeerOrEmpty() != "" { + isRemote = true + } csnIdx, addr, _ := node.BestAddress(isRemote) existing := upstreamsSnapshot.PassthroughIndices[addr] diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index b00fcfeeb6..0d5c147762 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -729,11 +729,12 @@ func (s *ResourceGenerator) makeUpstreamClusterForPeerService( }, } } else { + ep, _ := cfgSnap.ConnectProxy.PeerUpstreamEndpoints.Get(uid) configureClusterWithHostnames( s.Logger, c, "", /*TODO:make configurable?*/ - cfgSnap.ConnectProxy.PeerUpstreamEndpoints[uid], + ep, true, /*isRemote*/ false, /*onlyPassing*/ ) @@ -743,7 +744,8 @@ func (s *ResourceGenerator) makeUpstreamClusterForPeerService( rootPEMs := cfgSnap.RootPEMs() if uid.Peer != "" { - rootPEMs = cfgSnap.ConnectProxy.UpstreamPeerTrustBundles[uid.Peer].ConcatenatedRootPEMs() + tbs, _ := cfgSnap.ConnectProxy.UpstreamPeerTrustBundles.Get(uid.Peer) + rootPEMs = tbs.ConcatenatedRootPEMs() } // Enable TLS upstream with the configured client certificate. @@ -1077,13 +1079,9 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain( } if configureTLS { - rootPEMs := cfgSnap.RootPEMs() - if uid.Peer != "" { - rootPEMs = cfgSnap.ConnectProxy.UpstreamPeerTrustBundles[uid.Peer].ConcatenatedRootPEMs() - } commonTLSContext := makeCommonTLSContext( cfgSnap.Leaf(), - rootPEMs, + cfgSnap.RootPEMs(), makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()), ) diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index 53e550132b..d305a85bdf 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -47,7 +47,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. // TODO: this estimate is wrong resources := make([]proto.Message, 0, len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+ - len(cfgSnap.ConnectProxy.PeerUpstreamEndpoints)+ + cfgSnap.ConnectProxy.PeerUpstreamEndpoints.Len()+ len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints)) // NOTE: Any time we skip a chain below we MUST also skip that discovery chain in clusters.go @@ -110,7 +110,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg. continue } - endpoints, ok := cfgSnap.ConnectProxy.PeerUpstreamEndpoints[uid] + endpoints, ok := cfgSnap.ConnectProxy.PeerUpstreamEndpoints.Get(uid) if ok { la := makeLoadAssignment( clusterName,