Validate chains are associated with upstreams

Previously we could get into a state where discovery chain entries were
not cleaned up after the associated watch was cancelled. These changes
add handling for that case where stray chain references are encountered.
This commit is contained in:
freddygv 2021-12-13 15:30:49 -07:00
parent 70d6358426
commit c5c290c503
4 changed files with 34 additions and 5 deletions

View File

@ -78,13 +78,21 @@ func (s *ResourceGenerator) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.C
} }
for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain { for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[id]
explicit := upstreamCfg.HasLocalPortOrSocket()
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit {
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue
}
chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id] chainEndpoints, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id]
if !ok { if !ok {
// this should not happen // this should not happen
return nil, fmt.Errorf("no endpoint map for upstream %q", id) return nil, fmt.Errorf("no endpoint map for upstream %q", id)
} }
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(id, cfgSnap.ConnectProxy.UpstreamConfig[id], chain, chainEndpoints, cfgSnap) upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(id, upstreamCfg, chain, chainEndpoints, cfgSnap)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -48,11 +48,19 @@ func (s *ResourceGenerator) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.
len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints)) len(cfgSnap.ConnectProxy.PreparedQueryEndpoints)+len(cfgSnap.ConnectProxy.WatchedUpstreamEndpoints))
for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain { for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[id]
explicit := upstreamCfg.HasLocalPortOrSocket()
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit {
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue
}
es := s.endpointsFromDiscoveryChain( es := s.endpointsFromDiscoveryChain(
id, id,
chain, chain,
cfgSnap.Locality, cfgSnap.Locality,
cfgSnap.ConnectProxy.UpstreamConfig[id], upstreamCfg,
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id], cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id],
cfgSnap.ConnectProxy.WatchedGatewayEndpoints[id], cfgSnap.ConnectProxy.WatchedGatewayEndpoints[id],
) )

View File

@ -95,6 +95,13 @@ func (s *ResourceGenerator) listenersFromSnapshotConnectProxy(cfgSnap *proxycfg.
for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain { for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[id] upstreamCfg := cfgSnap.ConnectProxy.UpstreamConfig[id]
explicit := upstreamCfg.HasLocalPortOrSocket()
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit {
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue
}
cfg := s.getAndModifyUpstreamConfigForListener(id, upstreamCfg, chain) cfg := s.getAndModifyUpstreamConfigForListener(id, upstreamCfg, chain)
// If escape hatch is present, create a listener from it and move on to the next // If escape hatch is present, create a listener from it and move on to the next

View File

@ -28,7 +28,7 @@ func (s *ResourceGenerator) routesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot)
switch cfgSnap.Kind { switch cfgSnap.Kind {
case structs.ServiceKindConnectProxy: case structs.ServiceKindConnectProxy:
return s.routesForConnectProxy(cfgSnap.ConnectProxy.DiscoveryChain) return s.routesForConnectProxy(cfgSnap)
case structs.ServiceKindIngressGateway: case structs.ServiceKindIngressGateway:
return s.routesForIngressGateway( return s.routesForIngressGateway(
cfgSnap.IngressGateway.Listeners, cfgSnap.IngressGateway.Listeners,
@ -46,13 +46,19 @@ func (s *ResourceGenerator) routesFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot)
// routesFromSnapshotConnectProxy returns the xDS API representation of the // routesFromSnapshotConnectProxy returns the xDS API representation of the
// "routes" in the snapshot. // "routes" in the snapshot.
func (s *ResourceGenerator) routesForConnectProxy(chains map[string]*structs.CompiledDiscoveryChain) ([]proto.Message, error) { func (s *ResourceGenerator) routesForConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) {
var resources []proto.Message var resources []proto.Message
for id, chain := range chains { for id, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
if chain.IsDefault() { if chain.IsDefault() {
continue continue
} }
explicit := cfgSnap.ConnectProxy.UpstreamConfig[id].HasLocalPortOrSocket()
if _, implicit := cfgSnap.ConnectProxy.IntentionUpstreams[id]; !implicit && !explicit {
// Discovery chain is not associated with a known explicit or implicit upstream so it is skipped.
continue
}
virtualHost, err := makeUpstreamRouteForDiscoveryChain(id, chain, []string{"*"}) virtualHost, err := makeUpstreamRouteForDiscoveryChain(id, chain, []string{"*"})
if err != nil { if err != nil {
return nil, err return nil, err