Use new maps for proxycfg peered data

This commit is contained in:
Chris S. Kim 2022-07-13 12:14:57 -04:00 committed by Chris S. Kim
parent 7f32cba735
commit 02cff2394d
12 changed files with 430 additions and 53 deletions

View File

@ -4113,6 +4113,8 @@ func (a *Agent) registerCache() {
a.cache.RegisterType(cachetype.TrustBundleListName, &cachetype.TrustBundles{Client: a.rpcClientPeering})
a.cache.RegisterType(cachetype.PeeredUpstreamsName, &cachetype.PeeredUpstreams{RPC: a})
a.registerEntCache()
}

View File

@ -6,6 +6,7 @@ import (
"strings"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering"
)
@ -22,8 +23,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
snap.ConnectProxy.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedUpstreamPeerTrustBundles = make(map[string]context.CancelFunc)
snap.ConnectProxy.UpstreamPeerTrustBundles = make(map[string]*pbpeering.PeeringTrustBundle)
snap.ConnectProxy.UpstreamPeerTrustBundles = watch.NewMap[string, *pbpeering.PeeringTrustBundle]()
snap.ConnectProxy.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc)
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes)
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
@ -31,7 +31,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream)
snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]map[string]map[string]struct{})
snap.ConnectProxy.PassthroughIndices = make(map[string]indexedTarget)
snap.ConnectProxy.PeerUpstreamEndpoints = make(map[UpstreamID]structs.CheckServiceNodes)
snap.ConnectProxy.PeerUpstreamEndpoints = watch.NewMap[UpstreamID, structs.CheckServiceNodes]()
snap.ConnectProxy.PeerUpstreamEndpointsUseHostnames = make(map[UpstreamID]struct{})
// Watch for root changes
@ -108,6 +108,14 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
if err != nil {
return snap, err
}
err = s.dataSources.PeeredUpstreams.Notify(ctx, &structs.PartitionSpecificRequest{
QueryOptions: structs.QueryOptions{Token: s.token},
Datacenter: s.source.Datacenter,
EnterpriseMeta: s.proxyID.EnterpriseMeta,
}, peeredUpstreamsID, s.ch)
if err != nil {
return snap, err
}
}
// Watch for updates to service endpoints for all upstreams
@ -134,7 +142,8 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
dc = u.Datacenter
}
if s.proxyCfg.Mode == structs.ProxyModeTransparent && (dc == "" || dc == s.source.Datacenter) {
// In transparent proxy mode, watches for upstreams in the local DC are handled by the IntentionUpstreams watch.
// In transparent proxy mode, watches for upstreams in the local DC
// are handled by the IntentionUpstreams and PeeredUpstreams watch.
continue
}
@ -183,8 +192,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
s.logger.Trace("initializing watch of peered upstream", "upstream", uid)
// TODO(peering): We'll need to track a CancelFunc for this
// once the tproxy support lands.
snap.ConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, nil)
err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{
PeerName: uid.Peer,
Datacenter: dc,
@ -204,7 +212,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
}
// Check whether a watch for this peer exists to avoid duplicates.
if _, ok := snap.ConnectProxy.WatchedUpstreamPeerTrustBundles[uid.Peer]; !ok {
if _, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get(uid.Peer); !ok {
peerCtx, cancel := context.WithCancel(ctx)
if err := s.dataSources.TrustBundle.Notify(peerCtx, &pbpeering.TrustBundleReadRequest{
Name: uid.Peer,
@ -214,7 +222,7 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
return snap, fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err)
}
snap.ConnectProxy.WatchedUpstreamPeerTrustBundles[uid.Peer] = cancel
snap.ConnectProxy.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel)
}
continue
}
@ -262,7 +270,7 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
}
peer := strings.TrimPrefix(u.CorrelationID, peerTrustBundleIDPrefix)
if resp.Bundle != nil {
snap.ConnectProxy.UpstreamPeerTrustBundles[peer] = resp.Bundle
snap.ConnectProxy.UpstreamPeerTrustBundles.Set(peer, resp.Bundle)
}
case u.CorrelationID == peeringTrustBundlesWatchID:
@ -283,6 +291,90 @@ func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, s
snap.ConnectProxy.Intentions = resp
snap.ConnectProxy.IntentionsSet = true
case u.CorrelationID == peeredUpstreamsID:
resp, ok := u.Result.(*structs.IndexedPeeredServiceList)
if !ok {
return fmt.Errorf("invalid type for response %T", u.Result)
}
seenUpstreams := make(map[UpstreamID]struct{})
for _, psn := range resp.Services {
uid := NewUpstreamIDFromPeeredServiceName(psn)
if _, ok := seenUpstreams[uid]; ok {
continue
}
seenUpstreams[uid] = struct{}{}
s.logger.Trace("initializing watch of peered upstream", "upstream", uid)
hctx, hcancel := context.WithCancel(ctx)
err := s.dataSources.Health.Notify(hctx, &structs.ServiceSpecificRequest{
PeerName: uid.Peer,
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{
Token: s.token,
},
ServiceName: psn.ServiceName.Name,
Connect: true,
// Note that Identifier doesn't type-prefix for service any more as it's
// the default and makes metrics and other things much cleaner. It's
// simpler for us if we have the type to make things unambiguous.
Source: *s.source,
EnterpriseMeta: uid.EnterpriseMeta,
}, upstreamPeerWatchIDPrefix+uid.String(), s.ch)
if err != nil {
hcancel()
return fmt.Errorf("failed to watch health for %s: %v", uid, err)
}
snap.ConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, hcancel)
// Check whether a watch for this peer exists to avoid duplicates.
if _, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get(uid.Peer); !ok {
peerCtx, cancel := context.WithCancel(ctx)
if err := s.dataSources.TrustBundle.Notify(peerCtx, &pbpeering.TrustBundleReadRequest{
Name: uid.Peer,
Partition: uid.PartitionOrDefault(),
}, peerTrustBundleIDPrefix+uid.Peer, s.ch); err != nil {
cancel()
return fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err)
}
snap.ConnectProxy.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel)
}
}
snap.ConnectProxy.PeeredUpstreams = seenUpstreams
//
// Clean up data
//
validPeerNames := make(map[string]struct{})
// Iterate through all known endpoints and remove references to upstream IDs that weren't in the update
snap.ConnectProxy.PeerUpstreamEndpoints.ForEachKey(func(uid UpstreamID) bool {
// Peered upstream is explicitly defined in upstream config
if _, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok {
validPeerNames[uid.Peer] = struct{}{}
return true
}
// Peered upstream came from dynamic source of imported services
if _, ok := seenUpstreams[uid]; ok {
validPeerNames[uid.Peer] = struct{}{}
return true
}
snap.ConnectProxy.PeerUpstreamEndpoints.CancelWatch(uid)
return true
})
// Iterate through all known trust bundles and remove references to any unseen peer names
snap.ConnectProxy.UpstreamPeerTrustBundles.ForEachKey(func(peerName PeerName) bool {
if _, ok := validPeerNames[peerName]; !ok {
snap.ConnectProxy.UpstreamPeerTrustBundles.CancelWatch(peerName)
}
return true
})
case u.CorrelationID == intentionUpstreamsID:
resp, ok := u.Result.(*structs.IndexedServiceList)
if !ok {

View File

@ -69,6 +69,8 @@ type DataSources struct {
// notification channel.
LeafCertificate LeafCertificate
// PeeredUpstreams provides imported-service upstream updates on a
// notification channel.
PeeredUpstreams PeeredUpstreams
// PreparedQuery provides updates about the results of a prepared query.

View File

@ -11,6 +11,7 @@ import (
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/discoverychain"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbpeering"
@ -230,8 +231,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
},
PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{},
PassthroughIndices: map[string]indexedTarget{},
UpstreamPeerTrustBundles: map[string]*pbpeering.PeeringTrustBundle{},
PeerUpstreamEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
UpstreamPeerTrustBundles: watch.NewMap[PeerName, *pbpeering.PeeringTrustBundle](),
PeerUpstreamEndpoints: watch.NewMap[UpstreamID, structs.CheckServiceNodes](),
PeerUpstreamEndpointsUseHostnames: map[UpstreamID]struct{}{},
},
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
@ -291,8 +292,8 @@ func TestManager_BasicLifecycle(t *testing.T) {
},
PassthroughUpstreams: map[UpstreamID]map[string]map[string]struct{}{},
PassthroughIndices: map[string]indexedTarget{},
UpstreamPeerTrustBundles: map[string]*pbpeering.PeeringTrustBundle{},
PeerUpstreamEndpoints: map[UpstreamID]structs.CheckServiceNodes{},
UpstreamPeerTrustBundles: watch.NewMap[PeerName, *pbpeering.PeeringTrustBundle](),
PeerUpstreamEndpoints: watch.NewMap[UpstreamID, structs.CheckServiceNodes](),
PeerUpstreamEndpointsUseHostnames: map[UpstreamID]struct{}{},
},
PreparedQueryEndpoints: map[UpstreamID]structs.CheckServiceNodes{},

View File

@ -7,6 +7,8 @@ import (
"github.com/hashicorp/consul/agent/structs"
)
type PeerName = string
type UpstreamID struct {
Type string
Name string
@ -32,7 +34,16 @@ func NewUpstreamID(u *structs.Upstream) UpstreamID {
return id
}
// TODO(peering): confirm we don't need peername here
func NewUpstreamIDFromPeeredServiceName(psn structs.PeeredServiceName) UpstreamID {
id := UpstreamID{
Name: psn.ServiceName.Name,
EnterpriseMeta: psn.ServiceName.EnterpriseMeta,
Peer: psn.Peer,
}
id.normalize()
return id
}
func NewUpstreamIDFromServiceName(sn structs.ServiceName) UpstreamID {
id := UpstreamID{
Name: sn.Name,

View File

@ -9,6 +9,7 @@ import (
"github.com/mitchellh/copystructure"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/proto/pbpeering"
@ -44,13 +45,9 @@ type ConfigSnapshotUpstreams struct {
// endpoints of an upstream.
WatchedUpstreamEndpoints map[UpstreamID]map[string]structs.CheckServiceNodes
// WatchedUpstreamPeerTrustBundles is a map of (PeerName -> CancelFunc) in order to cancel
// watches for peer trust bundles any time the list of upstream peers changes.
WatchedUpstreamPeerTrustBundles map[string]context.CancelFunc
// UpstreamPeerTrustBundles is a map of (PeerName -> PeeringTrustBundle).
// It is used to store trust bundles for upstream TLS transport sockets.
UpstreamPeerTrustBundles map[string]*pbpeering.PeeringTrustBundle
UpstreamPeerTrustBundles watch.Map[PeerName, *pbpeering.PeeringTrustBundle]
// WatchedGateways is a map of UpstreamID -> (map of GatewayKey.String() ->
// CancelFunc) in order to cancel watches for mesh gateways
@ -80,10 +77,16 @@ type ConfigSnapshotUpstreams struct {
// This list only applies to proxies registered in 'transparent' mode.
IntentionUpstreams map[UpstreamID]struct{}
// PeeredUpstreams is a set of all upstream targets in a local partition.
//
// This list only applies to proxies registered in 'transparent' mode.
PeeredUpstreams map[UpstreamID]struct{}
// PeerUpstreamEndpoints is a map of UpstreamID -> (set of IP addresses)
// and used to determine the backing endpoints of an upstream in another
// peer.
PeerUpstreamEndpoints map[UpstreamID]structs.CheckServiceNodes
PeerUpstreamEndpoints watch.Map[UpstreamID, structs.CheckServiceNodes]
PeerUpstreamEndpointsUseHostnames map[UpstreamID]struct{}
}
@ -152,8 +155,7 @@ func (c *configSnapshotConnectProxy) isEmpty() bool {
len(c.WatchedDiscoveryChains) == 0 &&
len(c.WatchedUpstreams) == 0 &&
len(c.WatchedUpstreamEndpoints) == 0 &&
len(c.WatchedUpstreamPeerTrustBundles) == 0 &&
len(c.UpstreamPeerTrustBundles) == 0 &&
c.UpstreamPeerTrustBundles.Len() == 0 &&
len(c.WatchedGateways) == 0 &&
len(c.WatchedGatewayEndpoints) == 0 &&
len(c.WatchedServiceChecks) == 0 &&
@ -161,9 +163,10 @@ func (c *configSnapshotConnectProxy) isEmpty() bool {
len(c.UpstreamConfig) == 0 &&
len(c.PassthroughUpstreams) == 0 &&
len(c.IntentionUpstreams) == 0 &&
len(c.PeeredUpstreams) == 0 &&
!c.InboundPeerTrustBundlesSet &&
!c.MeshConfigSet &&
len(c.PeerUpstreamEndpoints) == 0 &&
c.PeerUpstreamEndpoints.Len() == 0 &&
len(c.PeerUpstreamEndpointsUseHostnames) == 0
}
@ -715,7 +718,6 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) {
snap.ConnectProxy.WatchedUpstreams = nil
snap.ConnectProxy.WatchedGateways = nil
snap.ConnectProxy.WatchedDiscoveryChains = nil
snap.ConnectProxy.WatchedUpstreamPeerTrustBundles = nil
case structs.ServiceKindTerminatingGateway:
snap.TerminatingGateway.WatchedServices = nil
snap.TerminatingGateway.WatchedIntentions = nil
@ -730,7 +732,6 @@ func (s *ConfigSnapshot) Clone() (*ConfigSnapshot, error) {
snap.IngressGateway.WatchedUpstreams = nil
snap.IngressGateway.WatchedGateways = nil
snap.IngressGateway.WatchedDiscoveryChains = nil
snap.IngressGateway.WatchedUpstreamPeerTrustBundles = nil
// only ingress-gateway
snap.IngressGateway.LeafCertWatchCancel = nil
}
@ -803,7 +804,7 @@ func (s *ConfigSnapshot) MeshConfigTLSOutgoing() *structs.MeshDirectionalTLSConf
}
func (u *ConfigSnapshotUpstreams) UpstreamPeerMeta(uid UpstreamID) structs.PeeringServiceMeta {
nodes := u.PeerUpstreamEndpoints[uid]
nodes, _ := u.PeerUpstreamEndpoints.Get(uid)
if len(nodes) == 0 {
return structs.PeeringServiceMeta{}
}
@ -833,7 +834,7 @@ func (u *ConfigSnapshotUpstreams) PeeredUpstreamIDs() []UpstreamID {
continue
}
if _, ok := u.UpstreamPeerTrustBundles[uid.Peer]; uid.Peer != "" && !ok {
if _, ok := u.UpstreamPeerTrustBundles.Get(uid.Peer); !ok {
// The trust bundle for this upstream is not available yet, skip for now.
continue
}

View File

@ -36,6 +36,7 @@ const (
serviceResolverIDPrefix = "service-resolver:"
serviceIntentionsIDPrefix = "service-intentions:"
intentionUpstreamsID = "intention-upstreams"
peeredUpstreamsID = "peered-upstreams"
upstreamPeerWatchIDPrefix = "upstream-peer:"
exportedServiceListWatchID = "exported-service-list"
meshConfigEntryID = "mesh"

View File

@ -131,6 +131,7 @@ func recordWatches(sc *stateConfig) *watchRecorder {
IntentionUpstreams: typedWatchRecorder[*structs.ServiceSpecificRequest]{wr},
InternalServiceDump: typedWatchRecorder[*structs.ServiceDumpRequest]{wr},
LeafCertificate: typedWatchRecorder[*cachetype.ConnectCALeafRequest]{wr},
PeeredUpstreams: typedWatchRecorder[*structs.PartitionSpecificRequest]{wr},
PreparedQuery: typedWatchRecorder[*structs.PreparedQueryExecuteRequest]{wr},
ResolvedServiceConfig: typedWatchRecorder[*structs.ServiceConfigRequest]{wr},
ServiceList: typedWatchRecorder[*structs.DCSpecificRequest]{wr},
@ -321,6 +322,15 @@ func genVerifyServiceSpecificPeeredRequest(expectedService, expectedFilter, expe
}
}
func genVerifyPartitionSpecificRequest(expectedPartition, expectedDatacenter string) verifyWatchRequest {
return func(t testing.TB, request any) {
reqReal, ok := request.(*structs.PartitionSpecificRequest)
require.True(t, ok)
require.Equal(t, expectedDatacenter, reqReal.Datacenter)
require.Equal(t, expectedPartition, reqReal.PartitionOrDefault())
}
}
func genVerifyGatewayServiceWatch(expectedService, expectedDatacenter string) verifyWatchRequest {
return genVerifyServiceSpecificRequest(expectedService, "", expectedDatacenter, false)
}
@ -404,9 +414,11 @@ func TestState_WatchesAndUpdates(t *testing.T) {
dbUID = NewUpstreamIDFromServiceName(db)
pqUID = UpstreamIDFromString("prepared_query:query")
extApiUID = NewUpstreamIDFromServiceName(apiA)
extDBUID = NewUpstreamIDFromServiceName(db)
)
// TODO(peering): NewUpstreamIDFromServiceName should take a PeerName
extApiUID.Peer = "peer-a"
extDBUID.Peer = "peer-a"
const peerTrustDomain = "1c053652-8512-4373-90cf-5a7f6263a994.consul"
@ -2509,6 +2521,253 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
},
},
"transparent-proxy-initial-with-peers": {
ns: structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "api-proxy",
Service: "api-proxy",
Address: "10.0.1.1",
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "api",
Mode: structs.ProxyModeTransparent,
Upstreams: structs.Upstreams{
{
DestinationName: "api-a",
DestinationPeer: "peer-a",
},
},
},
},
sourceDC: "dc1",
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
peeringTrustBundlesWatchID: genVerifyTrustBundleListWatch("api"),
peeredUpstreamsID: genVerifyPartitionSpecificRequest(acl.DefaultEnterpriseMeta().PartitionOrDefault(), "dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
leafWatchID: genVerifyLeafWatch("api", "dc1"),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "proxy without roots/leaf/intentions is not valid")
require.True(t, snap.MeshGateway.isEmpty())
require.True(t, snap.IngressGateway.isEmpty())
require.True(t, snap.TerminatingGateway.isEmpty())
require.False(t, snap.ConnectProxy.isEmpty())
// This is explicitly defined from proxy config
expectUpstreams := map[UpstreamID]*structs.Upstream{
extApiUID: {
DestinationName: "api-a",
DestinationNamespace: structs.IntentionDefaultNamespace,
DestinationPartition: structs.IntentionDefaultNamespace,
DestinationPeer: "peer-a",
},
}
require.Equal(t, expectUpstreams, snap.ConnectProxy.UpstreamConfig)
},
},
{
// Initial events
events: []UpdateEvent{
rootWatchEvent(),
{
CorrelationID: leafWatchID,
Result: issuedCert,
Err: nil,
},
{
CorrelationID: intentionsWatchID,
Result: TestIntentions(),
Err: nil,
},
{
CorrelationID: peeringTrustBundlesWatchID,
Result: peerTrustBundles,
},
{
CorrelationID: peeredUpstreamsID,
Result: &structs.IndexedPeeredServiceList{
Services: []structs.PeeredServiceName{
{
ServiceName: apiA,
Peer: "peer-a",
},
{
// This service is dynamic (not from static config)
ServiceName: db,
Peer: "peer-a",
},
},
},
},
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: nil, // no explicit config
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.Leaf())
require.True(t, snap.MeshGateway.isEmpty())
require.True(t, snap.IngressGateway.isEmpty())
require.True(t, snap.TerminatingGateway.isEmpty())
require.True(t, snap.ConnectProxy.MeshConfigSet)
require.Nil(t, snap.ConnectProxy.MeshConfig)
// Check PeeredUpstream is populated
expect := map[UpstreamID]struct{}{
extDBUID: {},
extApiUID: {},
}
require.Equal(t, expect, snap.ConnectProxy.PeeredUpstreams)
require.True(t, snap.ConnectProxy.PeerUpstreamEndpoints.IsWatched(extApiUID))
_, ok := snap.ConnectProxy.PeerUpstreamEndpoints.Get(extApiUID)
require.False(t, ok, "expected initialized but empty PeerUpstreamEndpoint")
require.True(t, snap.ConnectProxy.PeerUpstreamEndpoints.IsWatched(extDBUID))
_, ok = snap.ConnectProxy.PeerUpstreamEndpoints.Get(extDBUID)
require.False(t, ok, "expected initialized but empty PeerUpstreamEndpoint")
require.True(t, snap.ConnectProxy.UpstreamPeerTrustBundles.IsWatched("peer-a"))
_, ok = snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a")
require.False(t, ok, "expected initialized but empty PeerTrustBundle")
},
},
{
// Peered upstream will have set up 3 more watches
requiredWatches: map[string]verifyWatchRequest{
upstreamPeerWatchIDPrefix + extApiUID.String(): genVerifyServiceSpecificPeeredRequest("api-a", "", "dc1", "peer-a", true),
upstreamPeerWatchIDPrefix + extDBUID.String(): genVerifyServiceSpecificPeeredRequest("db", "", "dc1", "peer-a", true),
peerTrustBundleIDPrefix + "peer-a": genVerifyTrustBundleReadWatch("peer-a"),
},
events: []UpdateEvent{
{
CorrelationID: peerTrustBundleIDPrefix + "peer-a",
Result: &pbpeering.TrustBundleReadResponse{
Bundle: peerTrustBundles.Bundles[0],
},
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
require.Equal(t, indexedRoots, snap.Roots)
require.Equal(t, issuedCert, snap.Leaf())
require.True(t, snap.MeshGateway.isEmpty())
require.True(t, snap.IngressGateway.isEmpty())
require.True(t, snap.TerminatingGateway.isEmpty())
require.True(t, snap.ConnectProxy.MeshConfigSet)
require.Nil(t, snap.ConnectProxy.MeshConfig)
// Check PeeredUpstream is populated
expect := map[UpstreamID]struct{}{
extDBUID: {},
extApiUID: {},
}
require.Equal(t, expect, snap.ConnectProxy.PeeredUpstreams)
// Expect two entries (DB and api-a)
require.Equal(t, 2, snap.ConnectProxy.PeerUpstreamEndpoints.Len())
// db does not have endpoints yet
ep, _ := snap.ConnectProxy.PeerUpstreamEndpoints.Get(extDBUID)
require.Nil(t, ep)
// Expect a trust bundle
ptb, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a")
require.True(t, ok)
prototest.AssertDeepEqual(t, peerTrustBundles.Bundles[0], ptb)
// Sanity check that local upstream maps are not populated
require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints[extDBUID])
require.Empty(t, snap.ConnectProxy.PassthroughUpstreams[extDBUID])
require.Empty(t, snap.ConnectProxy.PassthroughIndices)
},
},
{
// Add another instance of "api-a" service
events: []UpdateEvent{
{
CorrelationID: upstreamPeerWatchIDPrefix + extDBUID.String(),
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Node: "node1",
Address: "127.0.0.1",
PeerName: "peer-a",
},
Service: &structs.NodeService{
ID: "db",
Service: "db",
PeerName: "peer-a",
Connect: structs.ServiceConnect{},
},
},
},
},
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
// Check PeeredUpstream is populated
expect := map[UpstreamID]struct{}{
extApiUID: {},
extDBUID: {},
}
require.Equal(t, expect, snap.ConnectProxy.PeeredUpstreams)
// Expect two entries (api-a, db)
require.Equal(t, 2, snap.ConnectProxy.PeerUpstreamEndpoints.Len())
// db has an endpoint now
ep, _ := snap.ConnectProxy.PeerUpstreamEndpoints.Get(extDBUID)
require.NotNil(t, ep)
require.Len(t, ep, 1)
// Expect a trust bundle
ptb, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a")
require.True(t, ok)
prototest.AssertDeepEqual(t, peerTrustBundles.Bundles[0], ptb)
// Sanity check that local upstream maps are not populated
require.Empty(t, snap.ConnectProxy.WatchedUpstreamEndpoints[extDBUID])
require.Empty(t, snap.ConnectProxy.PassthroughUpstreams[extDBUID])
require.Empty(t, snap.ConnectProxy.PassthroughIndices)
},
},
{
// Empty list of peered upstreams should clean up map keys
events: []UpdateEvent{
{
CorrelationID: peeredUpstreamsID,
Result: &structs.IndexedPeeredServiceList{
Services: []structs.PeeredServiceName{},
},
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid(), "proxy with roots/leaf/intentions is valid")
require.Empty(t, snap.ConnectProxy.PeeredUpstreams)
// db endpoint should have been cleaned up
require.False(t, snap.ConnectProxy.PeerUpstreamEndpoints.IsWatched(extDBUID))
// Expect only api-a endpoint
require.Equal(t, 1, snap.ConnectProxy.PeerUpstreamEndpoints.Len())
require.Equal(t, 1, snap.ConnectProxy.UpstreamPeerTrustBundles.Len())
},
},
},
},
"connect-proxy": newConnectProxyCase(structs.MeshGatewayModeDefault),
"connect-proxy-mesh-gateway-local": newConnectProxyCase(structs.MeshGatewayModeLocal),
"connect-proxy-with-peers": {
@ -2564,10 +2823,15 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Len(t, snap.ConnectProxy.WatchedGateways, 0, "%+v", snap.ConnectProxy.WatchedGateways)
require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 0, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamPeerTrustBundles, "peer-a", "%+v", snap.ConnectProxy.WatchedUpstreamPeerTrustBundles)
require.Len(t, snap.ConnectProxy.UpstreamPeerTrustBundles, 0, "%+v", snap.ConnectProxy.UpstreamPeerTrustBundles)
// watch initialized
require.True(t, snap.ConnectProxy.UpstreamPeerTrustBundles.IsWatched("peer-a"))
_, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a")
require.False(t, ok) // but no data
require.Len(t, snap.ConnectProxy.PeerUpstreamEndpoints, 0, "%+v", snap.ConnectProxy.PeerUpstreamEndpoints)
// watch initialized
require.True(t, snap.ConnectProxy.PeerUpstreamEndpoints.IsWatched(extApiUID))
_, ok = snap.ConnectProxy.PeerUpstreamEndpoints.Get(extApiUID)
require.False(t, ok) // but no data
require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks)
require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints)
@ -2655,11 +2919,13 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.Len(t, snap.ConnectProxy.WatchedGateways, 1, "%+v", snap.ConnectProxy.WatchedGateways)
require.Len(t, snap.ConnectProxy.WatchedGatewayEndpoints, 1, "%+v", snap.ConnectProxy.WatchedGatewayEndpoints)
require.Contains(t, snap.ConnectProxy.WatchedUpstreamPeerTrustBundles, "peer-a", "%+v", snap.ConnectProxy.WatchedUpstreamPeerTrustBundles)
require.Equal(t, peerTrustBundles.Bundles[0], snap.ConnectProxy.UpstreamPeerTrustBundles["peer-a"], "%+v", snap.ConnectProxy.WatchedUpstreamPeerTrustBundles)
tb, ok := snap.ConnectProxy.UpstreamPeerTrustBundles.Get("peer-a")
require.True(t, ok)
prototest.AssertDeepEqual(t, peerTrustBundles.Bundles[0], tb)
require.Len(t, snap.ConnectProxy.PeerUpstreamEndpoints, 1, "%+v", snap.ConnectProxy.PeerUpstreamEndpoints)
require.NotNil(t, snap.ConnectProxy.PeerUpstreamEndpoints[extApiUID])
require.Equal(t, 1, snap.ConnectProxy.PeerUpstreamEndpoints.Len())
ep, _ := snap.ConnectProxy.PeerUpstreamEndpoints.Get(extApiUID)
require.NotNil(t, ep)
require.Len(t, snap.ConnectProxy.WatchedServiceChecks, 0, "%+v", snap.ConnectProxy.WatchedServiceChecks)
require.Len(t, snap.ConnectProxy.PreparedQueryEndpoints, 0, "%+v", snap.ConnectProxy.PreparedQueryEndpoints)

View File

@ -745,6 +745,7 @@ func testConfigSnapshotFixture(
IntentionUpstreams: &noopDataSource[*structs.ServiceSpecificRequest]{},
InternalServiceDump: &noopDataSource[*structs.ServiceDumpRequest]{},
LeafCertificate: &noopDataSource[*cachetype.ConnectCALeafRequest]{},
PeeredUpstreams: &noopDataSource[*structs.PartitionSpecificRequest]{},
PreparedQuery: &noopDataSource[*structs.PreparedQueryExecuteRequest]{},
ResolvedServiceConfig: &noopDataSource[*structs.ServiceConfigRequest]{},
ServiceList: &noopDataSource[*structs.DCSpecificRequest]{},
@ -971,6 +972,7 @@ type TestDataSources struct {
IntentionUpstreams *TestDataSource[*structs.ServiceSpecificRequest, *structs.IndexedServiceList]
InternalServiceDump *TestDataSource[*structs.ServiceDumpRequest, *structs.IndexedNodesWithGateways]
LeafCertificate *TestDataSource[*cachetype.ConnectCALeafRequest, *structs.IssuedCert]
PeeredUpstreams *TestDataSource[*structs.PartitionSpecificRequest, *structs.IndexedPeeredServiceList]
PreparedQuery *TestDataSource[*structs.PreparedQueryExecuteRequest, *structs.PreparedQueryExecuteResponse]
ResolvedServiceConfig *TestDataSource[*structs.ServiceConfigRequest, *structs.ServiceConfigResponse]
ServiceList *TestDataSource[*structs.DCSpecificRequest, *structs.IndexedServiceList]
@ -994,6 +996,7 @@ func (t *TestDataSources) ToDataSources() DataSources {
IntentionUpstreams: t.IntentionUpstreams,
InternalServiceDump: t.InternalServiceDump,
LeafCertificate: t.LeafCertificate,
PeeredUpstreams: t.PeeredUpstreams,
PreparedQuery: t.PreparedQuery,
ResolvedServiceConfig: t.ResolvedServiceConfig,
ServiceList: t.ServiceList,

View File

@ -103,19 +103,15 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv
resp.Nodes,
)
if len(filteredNodes) > 0 {
upstreamsSnapshot.PeerUpstreamEndpoints[uid] = filteredNodes
upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[uid] = struct{}{}
if set := upstreamsSnapshot.PeerUpstreamEndpoints.Set(uid, filteredNodes); set {
upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames[uid] = struct{}{}
}
} else {
upstreamsSnapshot.PeerUpstreamEndpoints[uid] = resp.Nodes
delete(upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames, uid)
if set := upstreamsSnapshot.PeerUpstreamEndpoints.Set(uid, resp.Nodes); set {
delete(upstreamsSnapshot.PeerUpstreamEndpointsUseHostnames, uid)
}
}
if s.kind != structs.ServiceKindConnectProxy || s.proxyCfg.Mode != structs.ProxyModeTransparent {
return nil
}
s.logger.Warn("skipping transparent proxy update for peered upstream")
case strings.HasPrefix(u.CorrelationID, "upstream-target:"):
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
if !ok {
@ -157,6 +153,10 @@ func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u UpdateEv
// Make sure to use an external address when crossing partition or DC boundaries.
isRemote := !snap.Locality.Matches(node.Node.Datacenter, node.Node.PartitionOrDefault())
// If node is peered it must be remote
if node.Node.PeerOrEmpty() != "" {
isRemote = true
}
csnIdx, addr, _ := node.BestAddress(isRemote)
existing := upstreamsSnapshot.PassthroughIndices[addr]

View File

@ -729,11 +729,12 @@ func (s *ResourceGenerator) makeUpstreamClusterForPeerService(
},
}
} else {
ep, _ := cfgSnap.ConnectProxy.PeerUpstreamEndpoints.Get(uid)
configureClusterWithHostnames(
s.Logger,
c,
"", /*TODO:make configurable?*/
cfgSnap.ConnectProxy.PeerUpstreamEndpoints[uid],
ep,
true, /*isRemote*/
false, /*onlyPassing*/
)
@ -743,7 +744,8 @@ func (s *ResourceGenerator) makeUpstreamClusterForPeerService(
rootPEMs := cfgSnap.RootPEMs()
if uid.Peer != "" {
rootPEMs = cfgSnap.ConnectProxy.UpstreamPeerTrustBundles[uid.Peer].ConcatenatedRootPEMs()
tbs, _ := cfgSnap.ConnectProxy.UpstreamPeerTrustBundles.Get(uid.Peer)
rootPEMs = tbs.ConcatenatedRootPEMs()
}
// Enable TLS upstream with the configured client certificate.
@ -1077,13 +1079,9 @@ func (s *ResourceGenerator) makeUpstreamClustersForDiscoveryChain(
}
if configureTLS {
rootPEMs := cfgSnap.RootPEMs()
if uid.Peer != "" {
rootPEMs = cfgSnap.ConnectProxy.UpstreamPeerTrustBundles[uid.Peer].ConcatenatedRootPEMs()
}
commonTLSContext := makeCommonTLSContext(
cfgSnap.Leaf(),
rootPEMs,
cfgSnap.RootPEMs(),
makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing()),
)

View File

@ -47,7 +47,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
// TODO: this estimate is wrong
resources := make([]proto.Message, 0,
len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+
len(cfgSnap.ConnectProxy.PeerUpstreamEndpoints)+
cfgSnap.ConnectProxy.PeerUpstreamEndpoints.Len()+
len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints))
// NOTE: Any time we skip a chain below we MUST also skip that discovery chain in clusters.go
@ -110,7 +110,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
continue
}
endpoints, ok := cfgSnap.ConnectProxy.PeerUpstreamEndpoints[uid]
endpoints, ok := cfgSnap.ConnectProxy.PeerUpstreamEndpoints.Get(uid)
if ok {
la := makeLoadAssignment(
clusterName,