Fix local mesh gateway with peering discovery chains. (#15690)

Fix local mesh gateway with peering discovery chains.

Prior to this patch, discovery chains with peers would not
properly honor the mesh gateway mode for two reasons.

1. An incorrect target upstream ID was used to lookup the
mesh gateway mode. To fix this, the parent upstream uid is
now used instead of the discovery-chain-target-uid to find
the intended mesh gateway mode.

2. The watch for local mesh gateways was never initialized
for discovery chains. To fix this, the discovery chains are
now scanned, and a local GW watch is spawned if: the mesh
gateway mode is local and the target is a peering connection.
This commit is contained in:
Derek Menteer 2022-12-07 13:07:42 -06:00 committed by GitHub
parent 1c5d54cb29
commit 97ec5279aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 67 additions and 24 deletions

3
.changelog/15690.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
connect: Fix peering failovers ignoring local mesh gateway configuration.
```

View File

@ -275,23 +275,8 @@ func (s *handlerConnectProxy) setupWatchesForPeeredUpstream(
// If a peered upstream is set to local mesh gw mode, // If a peered upstream is set to local mesh gw mode,
// set up a watch for them. // set up a watch for them.
if mgwMode == structs.MeshGatewayModeLocal { if mgwMode == structs.MeshGatewayModeLocal {
gk := GatewayKey{ up := &handlerUpstreams{handlerState: s.handlerState}
Partition: s.source.NodePartitionOrDefault(), up.setupWatchForLocalGWEndpoints(ctx, &snapConnectProxy.ConfigSnapshotUpstreams)
Datacenter: s.source.Datacenter,
}
if !snapConnectProxy.WatchedLocalGWEndpoints.IsWatched(gk.String()) {
opts := gatewayWatchOpts{
internalServiceDump: s.dataSources.InternalServiceDump,
notifyCh: s.ch,
source: *s.source,
token: s.token,
key: gk,
}
if err := watchMeshGateway(ctx, opts); err != nil {
return fmt.Errorf("error while watching for local mesh gateway: %w", err)
}
snapConnectProxy.WatchedLocalGWEndpoints.InitWatch(gk.String(), nil)
}
} else if mgwMode == structs.MeshGatewayModeNone { } else if mgwMode == structs.MeshGatewayModeNone {
s.logger.Warn(fmt.Sprintf("invalid mesh gateway mode 'none', defaulting to 'remote' for %q", uid)) s.logger.Warn(fmt.Sprintf("invalid mesh gateway mode 'none', defaulting to 'remote' for %q", uid))
} }

View File

@ -766,6 +766,12 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.True(t, snap.ConnectProxy.IntentionsSet) require.True(t, snap.ConnectProxy.IntentionsSet)
require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions) require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions)
require.True(t, snap.ConnectProxy.MeshConfigSet) require.True(t, snap.ConnectProxy.MeshConfigSet)
if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal {
require.True(t, snap.ConnectProxy.WatchedLocalGWEndpoints.IsWatched("dc1"))
_, ok := snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1")
require.False(t, ok)
}
}, },
} }
@ -799,6 +805,12 @@ func TestState_WatchesAndUpdates(t *testing.T) {
require.True(t, snap.ConnectProxy.IntentionsSet) require.True(t, snap.ConnectProxy.IntentionsSet)
require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions) require.Equal(t, ixnMatch, snap.ConnectProxy.Intentions)
if meshGatewayProxyConfigValue == structs.MeshGatewayModeLocal {
require.True(t, snap.ConnectProxy.WatchedLocalGWEndpoints.IsWatched("dc1"))
_, ok := snap.ConnectProxy.WatchedLocalGWEndpoints.Get("dc1")
require.False(t, ok)
}
}, },
} }

View File

@ -326,6 +326,10 @@ func (s *handlerUpstreams) resetWatchesFromChain(
if s.source.Datacenter != target.Datacenter || s.proxyID.PartitionOrDefault() != target.Partition { if s.source.Datacenter != target.Datacenter || s.proxyID.PartitionOrDefault() != target.Partition {
needGateways[gk.String()] = struct{}{} needGateways[gk.String()] = struct{}{}
} }
// Register a local gateway watch if any targets are pointing to a peer and require a mode of local.
if target.Peer != "" && target.MeshGateway.Mode == structs.MeshGatewayModeLocal {
s.setupWatchForLocalGWEndpoints(ctx, snap)
}
} }
// If the discovery chain's targets do not lead to watching all endpoints // If the discovery chain's targets do not lead to watching all endpoints
@ -548,3 +552,30 @@ func parseReducedUpstreamConfig(m map[string]interface{}) (reducedUpstreamConfig
err := mapstructure.WeakDecode(m, &cfg) err := mapstructure.WeakDecode(m, &cfg)
return cfg, err return cfg, err
} }
func (s *handlerUpstreams) setupWatchForLocalGWEndpoints(
ctx context.Context,
upstreams *ConfigSnapshotUpstreams,
) error {
gk := GatewayKey{
Partition: s.proxyID.PartitionOrDefault(),
Datacenter: s.source.Datacenter,
}
// If the watch is already initialized, do nothing.
if upstreams.WatchedLocalGWEndpoints.IsWatched(gk.String()) {
return nil
}
opts := gatewayWatchOpts{
internalServiceDump: s.dataSources.InternalServiceDump,
notifyCh: s.ch,
source: *s.source,
token: s.token,
key: gk,
}
if err := watchMeshGateway(ctx, opts); err != nil {
return fmt.Errorf("error while watching for local mesh gateway: %w", err)
}
upstreams.WatchedLocalGWEndpoints.InitWatch(gk.String(), nil)
return nil
}

View File

@ -92,7 +92,7 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
// upstream in clusters.go so that the sets of endpoints generated matches // upstream in clusters.go so that the sets of endpoints generated matches
// the sets of clusters. // the sets of clusters.
for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() { for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() {
_, skip := getUpstream(uid) upstream, skip := getUpstream(uid)
if skip { if skip {
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped. // Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue continue
@ -107,7 +107,11 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
clusterName := generatePeeredClusterName(uid, tbs) clusterName := generatePeeredClusterName(uid, tbs)
loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, clusterName, uid) mgwMode := structs.MeshGatewayModeDefault
if upstream != nil {
mgwMode = upstream.MeshGateway.Mode
}
loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, clusterName, uid, mgwMode)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -538,7 +542,12 @@ func makePipeEndpoint(path string) *envoy_endpoint_v3.LbEndpoint {
} }
} }
func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(cfgSnap *proxycfg.ConfigSnapshot, clusterName string, uid proxycfg.UpstreamID) (*envoy_endpoint_v3.ClusterLoadAssignment, error) { func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(
cfgSnap *proxycfg.ConfigSnapshot,
clusterName string,
uid proxycfg.UpstreamID,
upstreamGatewayMode structs.MeshGatewayMode,
) (*envoy_endpoint_v3.ClusterLoadAssignment, error) {
var la *envoy_endpoint_v3.ClusterLoadAssignment var la *envoy_endpoint_v3.ClusterLoadAssignment
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams() upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams()
@ -546,11 +555,9 @@ func (s *ResourceGenerator) makeUpstreamLoadAssignmentForPeerService(cfgSnap *pr
return la, err return la, err
} }
upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid]
// If an upstream is configured with local mesh gw mode, we make a load assignment // If an upstream is configured with local mesh gw mode, we make a load assignment
// from the gateway endpoints instead of those of the upstreams. // from the gateway endpoints instead of those of the upstreams.
if upstream != nil && upstream.MeshGateway.Mode == structs.MeshGatewayModeLocal { if upstreamGatewayMode == structs.MeshGatewayModeLocal {
localGw, ok := cfgSnap.ConnectProxy.WatchedLocalGWEndpoints.Get(cfgSnap.Locality.String()) localGw, ok := cfgSnap.ConnectProxy.WatchedLocalGWEndpoints.Get(cfgSnap.Locality.String())
if !ok { if !ok {
// local GW is not ready; return early // local GW is not ready; return early
@ -643,6 +650,11 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
} }
} }
mgwMode := structs.MeshGatewayModeDefault
if upstream := cfgSnap.ConnectProxy.UpstreamConfig[uid]; upstream != nil {
mgwMode = upstream.MeshGateway.Mode
}
// Find all resolver nodes. // Find all resolver nodes.
for _, node := range chain.Nodes { for _, node := range chain.Nodes {
if node.Type != structs.DiscoveryGraphNodeTypeResolver { if node.Type != structs.DiscoveryGraphNodeTypeResolver {
@ -682,7 +694,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain(
s.Logger.Debug("generating endpoints for", "cluster", targetOpt.clusterName) s.Logger.Debug("generating endpoints for", "cluster", targetOpt.clusterName)
targetUID := proxycfg.NewUpstreamIDFromTargetID(targetOpt.targetID) targetUID := proxycfg.NewUpstreamIDFromTargetID(targetOpt.targetID)
if targetUID.Peer != "" { if targetUID.Peer != "" {
loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, targetOpt.clusterName, targetUID) loadAssignment, err := s.makeUpstreamLoadAssignmentForPeerService(cfgSnap, targetOpt.clusterName, targetUID, mgwMode)
if err != nil { if err != nil {
return nil, err return nil, err
} }