mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 14:24:39 +00:00
97ec5279aa
Fix local mesh gateway with peering discovery chains. Prior to this patch, discovery chains with peers would not properly honor the mesh gateway mode for two reasons. 1. An incorrect target upstream ID was used to lookup the mesh gateway mode. To fix this, the parent upstream uid is now used instead of the discovery-chain-target-uid to find the intended mesh gateway mode. 2. The watch for local mesh gateways was never initialized for discovery chains. To fix this, the discovery chains are now scanned, and a local GW watch is spawned if: the mesh gateway mode is local and the target is a peering connection.
623 lines
22 KiB
Go
623 lines
22 KiB
Go
package proxycfg
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
|
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
|
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/proto/pbpeering"
|
|
)
|
|
|
|
type handlerConnectProxy struct {
|
|
handlerState
|
|
}
|
|
|
|
// initialize sets up the watches needed based on current proxy registration
|
|
// state.
|
|
func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, error) {
|
|
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
|
|
snap.ConnectProxy.DiscoveryChain = make(map[UpstreamID]*structs.CompiledDiscoveryChain)
|
|
snap.ConnectProxy.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc)
|
|
snap.ConnectProxy.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc)
|
|
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes)
|
|
snap.ConnectProxy.UpstreamPeerTrustBundles = watch.NewMap[string, *pbpeering.PeeringTrustBundle]()
|
|
snap.ConnectProxy.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc)
|
|
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes)
|
|
snap.ConnectProxy.WatchedLocalGWEndpoints = watch.NewMap[string, structs.CheckServiceNodes]()
|
|
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
|
|
snap.ConnectProxy.PreparedQueryEndpoints = make(map[UpstreamID]structs.CheckServiceNodes)
|
|
snap.ConnectProxy.DestinationsUpstream = watch.NewMap[UpstreamID, *structs.ServiceConfigEntry]()
|
|
snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream)
|
|
snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]map[string]map[string]struct{})
|
|
snap.ConnectProxy.PassthroughIndices = make(map[string]indexedTarget)
|
|
snap.ConnectProxy.PeerUpstreamEndpoints = watch.NewMap[UpstreamID, structs.CheckServiceNodes]()
|
|
snap.ConnectProxy.DestinationGateways = watch.NewMap[UpstreamID, structs.CheckServiceNodes]()
|
|
snap.ConnectProxy.PeerUpstreamEndpointsUseHostnames = make(map[UpstreamID]struct{})
|
|
|
|
// Watch for root changes
|
|
err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
Source: *s.source,
|
|
}, rootsWatchID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
|
|
err = s.dataSources.TrustBundleList.Notify(ctx, &cachetype.TrustBundleListRequest{
|
|
Request: &pbpeering.TrustBundleListByServiceRequest{
|
|
ServiceName: s.proxyCfg.DestinationServiceName,
|
|
Namespace: s.proxyID.NamespaceOrDefault(),
|
|
Partition: s.proxyID.PartitionOrDefault(),
|
|
},
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
}, peeringTrustBundlesWatchID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
|
|
// Watch the leaf cert
|
|
err = s.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{
|
|
Datacenter: s.source.Datacenter,
|
|
Token: s.token,
|
|
Service: s.proxyCfg.DestinationServiceName,
|
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
|
}, leafWatchID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
|
|
// Watch for intention updates
|
|
err = s.dataSources.Intentions.Notify(ctx, &structs.ServiceSpecificRequest{
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
|
ServiceName: s.proxyCfg.DestinationServiceName,
|
|
}, intentionsWatchID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
|
|
// Get information about the entire service mesh.
|
|
err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{
|
|
Kind: structs.MeshConfig,
|
|
Name: structs.MeshConfigMesh,
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(s.proxyID.PartitionOrDefault()),
|
|
}, meshConfigEntryID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
|
|
// Watch for service check updates
|
|
err = s.dataSources.HTTPChecks.Notify(ctx, &cachetype.ServiceHTTPChecksRequest{
|
|
ServiceID: s.proxyCfg.DestinationServiceID,
|
|
NodeName: s.source.Node,
|
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
|
}, svcChecksWatchIDPrefix+structs.ServiceIDString(s.proxyCfg.DestinationServiceID, &s.proxyID.EnterpriseMeta), s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
|
|
if s.proxyCfg.Mode == structs.ProxyModeTransparent {
|
|
// When in transparent proxy we will infer upstreams from intentions with this source
|
|
err := s.dataSources.IntentionUpstreams.Notify(ctx, &structs.ServiceSpecificRequest{
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
ServiceName: s.proxyCfg.DestinationServiceName,
|
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
|
}, intentionUpstreamsID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
err = s.dataSources.PeeredUpstreams.Notify(ctx, &structs.PartitionSpecificRequest{
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
Datacenter: s.source.Datacenter,
|
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
|
}, peeredUpstreamsID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
// We also infer upstreams from destinations (egress points)
|
|
err = s.dataSources.IntentionUpstreamsDestination.Notify(ctx, &structs.ServiceSpecificRequest{
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
ServiceName: s.proxyCfg.DestinationServiceName,
|
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
|
}, intentionUpstreamsDestinationID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
}
|
|
|
|
// Watch for updates to service endpoints for all upstreams
|
|
for i := range s.proxyCfg.Upstreams {
|
|
u := s.proxyCfg.Upstreams[i]
|
|
|
|
uid := NewUpstreamID(&u)
|
|
|
|
// Store defaults keyed under wildcard so they can be applied to centrally configured upstreams
|
|
if u.DestinationName == structs.WildcardSpecifier {
|
|
snap.ConnectProxy.UpstreamConfig[uid] = &u
|
|
continue
|
|
}
|
|
|
|
snap.ConnectProxy.UpstreamConfig[uid] = &u
|
|
// This can be true if the upstream is a synthetic entry populated from centralized upstream config.
|
|
// Watches should not be created for them.
|
|
if u.CentrallyConfigured {
|
|
continue
|
|
}
|
|
|
|
dc := s.source.Datacenter
|
|
if u.Datacenter != "" {
|
|
dc = u.Datacenter
|
|
}
|
|
|
|
// Default the partition and namespace to the namespace of this proxy service.
|
|
partition := s.proxyID.PartitionOrDefault()
|
|
if u.DestinationPartition != "" {
|
|
partition = u.DestinationPartition
|
|
}
|
|
ns := s.proxyID.NamespaceOrDefault()
|
|
if u.DestinationNamespace != "" {
|
|
ns = u.DestinationNamespace
|
|
}
|
|
|
|
cfg, err := parseReducedUpstreamConfig(u.Config)
|
|
if err != nil {
|
|
// Don't hard fail on a config typo, just warn. We'll fall back on
|
|
// the plain discovery chain if there is an error so it's safe to
|
|
// continue.
|
|
s.logger.Warn("failed to parse upstream config",
|
|
"upstream", uid.String(),
|
|
"error", err,
|
|
)
|
|
}
|
|
|
|
switch u.DestinationType {
|
|
case structs.UpstreamDestTypePreparedQuery:
|
|
err := s.dataSources.PreparedQuery.Notify(ctx, &structs.PreparedQueryExecuteRequest{
|
|
Datacenter: dc,
|
|
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: defaultPreparedQueryPollInterval},
|
|
QueryIDOrName: u.DestinationName,
|
|
Connect: true,
|
|
Source: *s.source,
|
|
}, "upstream:"+uid.String(), s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
|
|
case structs.UpstreamDestTypeService:
|
|
fallthrough
|
|
|
|
case "":
|
|
if u.DestinationPeer != "" {
|
|
err := s.setupWatchesForPeeredUpstream(ctx, snap.ConnectProxy, NewUpstreamID(&u), u.MeshGateway.Mode, dc)
|
|
if err != nil {
|
|
return snap, fmt.Errorf("failed to setup watches for peered upstream %q: %w", uid.String(), err)
|
|
}
|
|
continue
|
|
}
|
|
|
|
err := s.dataSources.CompiledDiscoveryChain.Notify(ctx, &structs.DiscoveryChainRequest{
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
Name: u.DestinationName,
|
|
EvaluateInDatacenter: dc,
|
|
EvaluateInNamespace: ns,
|
|
EvaluateInPartition: partition,
|
|
OverrideMeshGateway: u.MeshGateway,
|
|
OverrideProtocol: cfg.Protocol,
|
|
OverrideConnectTimeout: cfg.ConnectTimeout(),
|
|
}, "discovery-chain:"+uid.String(), s.ch)
|
|
if err != nil {
|
|
return snap, fmt.Errorf("failed to watch discovery chain for %s: %v", uid.String(), err)
|
|
}
|
|
|
|
default:
|
|
return snap, fmt.Errorf("unknown upstream type: %q", u.DestinationType)
|
|
}
|
|
}
|
|
|
|
return snap, nil
|
|
}
|
|
|
|
func (s *handlerConnectProxy) setupWatchesForPeeredUpstream(
|
|
ctx context.Context,
|
|
snapConnectProxy configSnapshotConnectProxy,
|
|
uid UpstreamID,
|
|
mgwMode structs.MeshGatewayMode,
|
|
dc string,
|
|
) error {
|
|
s.logger.Trace("initializing watch of peered upstream", "upstream", uid)
|
|
|
|
// NOTE: An upstream that points to a peer by definition will
|
|
// only ever watch a single catalog query, so a map key of just
|
|
// "UID" is sufficient to cover the peer data watches here.
|
|
err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{
|
|
PeerName: uid.Peer,
|
|
Datacenter: dc,
|
|
QueryOptions: structs.QueryOptions{
|
|
Token: s.token,
|
|
},
|
|
ServiceName: uid.Name,
|
|
Connect: true,
|
|
Source: *s.source,
|
|
EnterpriseMeta: uid.EnterpriseMeta,
|
|
}, upstreamPeerWatchIDPrefix+uid.String(), s.ch)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to watch health for %s: %v", uid, err)
|
|
}
|
|
snapConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, nil)
|
|
|
|
// Check whether a watch for this peer exists to avoid duplicates.
|
|
if ok := snapConnectProxy.UpstreamPeerTrustBundles.IsWatched(uid.Peer); !ok {
|
|
peerCtx, cancel := context.WithCancel(ctx)
|
|
if err := s.dataSources.TrustBundle.Notify(peerCtx, &cachetype.TrustBundleReadRequest{
|
|
Request: &pbpeering.TrustBundleReadRequest{
|
|
Name: uid.Peer,
|
|
Partition: uid.PartitionOrDefault(),
|
|
},
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
}, peerTrustBundleIDPrefix+uid.Peer, s.ch); err != nil {
|
|
cancel()
|
|
return fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err)
|
|
}
|
|
|
|
snapConnectProxy.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel)
|
|
}
|
|
|
|
// If a peered upstream is set to local mesh gw mode,
|
|
// set up a watch for them.
|
|
if mgwMode == structs.MeshGatewayModeLocal {
|
|
up := &handlerUpstreams{handlerState: s.handlerState}
|
|
up.setupWatchForLocalGWEndpoints(ctx, &snapConnectProxy.ConfigSnapshotUpstreams)
|
|
} else if mgwMode == structs.MeshGatewayModeNone {
|
|
s.logger.Warn(fmt.Sprintf("invalid mesh gateway mode 'none', defaulting to 'remote' for %q", uid))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error {
|
|
if u.Err != nil {
|
|
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
|
}
|
|
|
|
switch {
|
|
case u.CorrelationID == rootsWatchID:
|
|
roots, ok := u.Result.(*structs.IndexedCARoots)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
snap.Roots = roots
|
|
|
|
case u.CorrelationID == peeringTrustBundlesWatchID:
|
|
resp, ok := u.Result.(*pbpeering.TrustBundleListByServiceResponse)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
if len(resp.Bundles) > 0 {
|
|
snap.ConnectProxy.InboundPeerTrustBundles = resp.Bundles
|
|
}
|
|
snap.ConnectProxy.InboundPeerTrustBundlesSet = true
|
|
|
|
case u.CorrelationID == intentionsWatchID:
|
|
resp, ok := u.Result.(structs.Intentions)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
snap.ConnectProxy.Intentions = resp
|
|
snap.ConnectProxy.IntentionsSet = true
|
|
|
|
case u.CorrelationID == peeredUpstreamsID:
|
|
resp, ok := u.Result.(*structs.IndexedPeeredServiceList)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response %T", u.Result)
|
|
}
|
|
|
|
seenUpstreams := make(map[UpstreamID]struct{})
|
|
for _, psn := range resp.Services {
|
|
uid := NewUpstreamIDFromPeeredServiceName(psn)
|
|
|
|
if _, ok := seenUpstreams[uid]; ok {
|
|
continue
|
|
}
|
|
seenUpstreams[uid] = struct{}{}
|
|
|
|
err := s.setupWatchesForPeeredUpstream(ctx, snap.ConnectProxy, uid, s.proxyCfg.MeshGateway.Mode, s.source.Datacenter)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to setup watches for peered upstream %q: %w", uid.String(), err)
|
|
}
|
|
}
|
|
snap.ConnectProxy.PeeredUpstreams = seenUpstreams
|
|
|
|
//
|
|
// Clean up data
|
|
//
|
|
|
|
peeredChainTargets := make(map[UpstreamID]struct{})
|
|
for _, discoChain := range snap.ConnectProxy.DiscoveryChain {
|
|
for _, target := range discoChain.Targets {
|
|
if target.Peer == "" {
|
|
continue
|
|
}
|
|
uid := NewUpstreamIDFromTargetID(target.ID)
|
|
peeredChainTargets[uid] = struct{}{}
|
|
}
|
|
}
|
|
|
|
validPeerNames := make(map[string]struct{})
|
|
|
|
// Iterate through all known endpoints and remove references to upstream IDs that weren't in the update
|
|
snap.ConnectProxy.PeerUpstreamEndpoints.ForEachKey(func(uid UpstreamID) bool {
|
|
// Peered upstream is explicitly defined in upstream config
|
|
if _, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok {
|
|
validPeerNames[uid.Peer] = struct{}{}
|
|
return true
|
|
}
|
|
// Peered upstream came from dynamic source of imported services
|
|
if _, ok := seenUpstreams[uid]; ok {
|
|
validPeerNames[uid.Peer] = struct{}{}
|
|
return true
|
|
}
|
|
// Peered upstream came from a discovery chain target
|
|
if _, ok := peeredChainTargets[uid]; ok {
|
|
validPeerNames[uid.Peer] = struct{}{}
|
|
return true
|
|
}
|
|
snap.ConnectProxy.PeerUpstreamEndpoints.CancelWatch(uid)
|
|
return true
|
|
})
|
|
|
|
// Iterate through all known trust bundles and remove references to any unseen peer names
|
|
snap.ConnectProxy.UpstreamPeerTrustBundles.ForEachKey(func(peerName PeerName) bool {
|
|
if _, ok := validPeerNames[peerName]; !ok {
|
|
snap.ConnectProxy.UpstreamPeerTrustBundles.CancelWatch(peerName)
|
|
}
|
|
return true
|
|
})
|
|
|
|
case u.CorrelationID == intentionUpstreamsID:
|
|
resp, ok := u.Result.(*structs.IndexedServiceList)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response %T", u.Result)
|
|
}
|
|
|
|
seenUpstreams := make(map[UpstreamID]struct{})
|
|
for _, svc := range resp.Services {
|
|
uid := NewUpstreamIDFromServiceName(svc)
|
|
|
|
seenUpstreams[uid] = struct{}{}
|
|
|
|
cfgMap := make(map[string]interface{})
|
|
u, ok := snap.ConnectProxy.UpstreamConfig[uid]
|
|
if ok {
|
|
cfgMap = u.Config
|
|
} else {
|
|
// Use the centralized upstream defaults if they exist and there isn't specific configuration for this upstream
|
|
// This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled
|
|
// by the ResolveServiceConfig endpoint.
|
|
wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, s.proxyID.WithWildcardNamespace())
|
|
wildcardUID := NewUpstreamIDFromServiceID(wildcardSID)
|
|
defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardUID]
|
|
if ok {
|
|
u = defaults
|
|
cfgMap = defaults.Config
|
|
snap.ConnectProxy.UpstreamConfig[uid] = defaults
|
|
}
|
|
}
|
|
|
|
cfg, err := parseReducedUpstreamConfig(cfgMap)
|
|
if err != nil {
|
|
// Don't hard fail on a config typo, just warn. We'll fall back on
|
|
// the plain discovery chain if there is an error so it's safe to
|
|
// continue.
|
|
s.logger.Warn("failed to parse upstream config",
|
|
"upstream", uid,
|
|
"error", err,
|
|
)
|
|
}
|
|
|
|
meshGateway := s.proxyCfg.MeshGateway
|
|
if u != nil {
|
|
meshGateway = u.MeshGateway
|
|
}
|
|
watchOpts := discoveryChainWatchOpts{
|
|
id: NewUpstreamIDFromServiceName(svc),
|
|
name: svc.Name,
|
|
namespace: svc.NamespaceOrDefault(),
|
|
partition: svc.PartitionOrDefault(),
|
|
datacenter: s.source.Datacenter,
|
|
cfg: cfg,
|
|
meshGateway: meshGateway,
|
|
}
|
|
up := &handlerUpstreams{handlerState: s.handlerState}
|
|
err = up.watchDiscoveryChain(ctx, snap, watchOpts)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to watch discovery chain for %s: %v", uid, err)
|
|
}
|
|
}
|
|
snap.ConnectProxy.IntentionUpstreams = seenUpstreams
|
|
|
|
// Clean up data from services that were not in the update
|
|
for uid, targets := range snap.ConnectProxy.WatchedUpstreams {
|
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && !upstream.CentrallyConfigured {
|
|
continue
|
|
}
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
for targetID, cancelFn := range targets {
|
|
cancelFn()
|
|
|
|
targetUID := NewUpstreamIDFromTargetID(targetID)
|
|
if targetUID.Peer != "" {
|
|
snap.ConnectProxy.PeerUpstreamEndpoints.CancelWatch(targetUID)
|
|
snap.ConnectProxy.UpstreamPeerTrustBundles.CancelWatch(targetUID.Peer)
|
|
}
|
|
}
|
|
delete(snap.ConnectProxy.WatchedUpstreams, uid)
|
|
}
|
|
}
|
|
for uid := range snap.ConnectProxy.WatchedUpstreamEndpoints {
|
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && !upstream.CentrallyConfigured {
|
|
continue
|
|
}
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
delete(snap.ConnectProxy.WatchedUpstreamEndpoints, uid)
|
|
}
|
|
}
|
|
for uid, cancelMap := range snap.ConnectProxy.WatchedGateways {
|
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && !upstream.CentrallyConfigured {
|
|
continue
|
|
}
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
for _, cancelFn := range cancelMap {
|
|
cancelFn()
|
|
}
|
|
delete(snap.ConnectProxy.WatchedGateways, uid)
|
|
}
|
|
}
|
|
for uid := range snap.ConnectProxy.WatchedGatewayEndpoints {
|
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && !upstream.CentrallyConfigured {
|
|
continue
|
|
}
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
delete(snap.ConnectProxy.WatchedGatewayEndpoints, uid)
|
|
}
|
|
}
|
|
for uid, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains {
|
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && !upstream.CentrallyConfigured {
|
|
continue
|
|
}
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
cancelFn()
|
|
delete(snap.ConnectProxy.WatchedDiscoveryChains, uid)
|
|
}
|
|
}
|
|
for uid := range snap.ConnectProxy.PassthroughUpstreams {
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
delete(snap.ConnectProxy.PassthroughUpstreams, uid)
|
|
}
|
|
}
|
|
for addr, indexed := range snap.ConnectProxy.PassthroughIndices {
|
|
if _, ok := seenUpstreams[indexed.upstreamID]; !ok {
|
|
delete(snap.ConnectProxy.PassthroughIndices, addr)
|
|
}
|
|
}
|
|
|
|
// These entries are intentionally handled separately from the WatchedDiscoveryChains above.
|
|
// There have been situations where a discovery watch was cancelled, then fired.
|
|
// That update event then re-populated the DiscoveryChain map entry, which wouldn't get cleaned up
|
|
// since there was no known watch for it.
|
|
for uid := range snap.ConnectProxy.DiscoveryChain {
|
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && !upstream.CentrallyConfigured {
|
|
continue
|
|
}
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
delete(snap.ConnectProxy.DiscoveryChain, uid)
|
|
}
|
|
}
|
|
case u.CorrelationID == intentionUpstreamsDestinationID:
|
|
resp, ok := u.Result.(*structs.IndexedServiceList)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response %T", u.Result)
|
|
}
|
|
seenUpstreams := make(map[UpstreamID]struct{})
|
|
for _, svc := range resp.Services {
|
|
uid := NewUpstreamIDFromServiceName(svc)
|
|
seenUpstreams[uid] = struct{}{}
|
|
{
|
|
childCtx, cancel := context.WithCancel(ctx)
|
|
err := s.dataSources.ConfigEntry.Notify(childCtx, &structs.ConfigEntryQuery{
|
|
Kind: structs.ServiceDefaults,
|
|
Name: svc.Name,
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
EnterpriseMeta: svc.EnterpriseMeta,
|
|
}, DestinationConfigEntryID+svc.String(), s.ch)
|
|
if err != nil {
|
|
cancel()
|
|
return err
|
|
}
|
|
snap.ConnectProxy.DestinationsUpstream.InitWatch(uid, cancel)
|
|
}
|
|
{
|
|
childCtx, cancel := context.WithCancel(ctx)
|
|
err := s.dataSources.ServiceGateways.Notify(childCtx, &structs.ServiceSpecificRequest{
|
|
ServiceName: svc.Name,
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
EnterpriseMeta: svc.EnterpriseMeta,
|
|
ServiceKind: structs.ServiceKindTerminatingGateway,
|
|
}, DestinationGatewayID+svc.String(), s.ch)
|
|
if err != nil {
|
|
cancel()
|
|
return err
|
|
}
|
|
snap.ConnectProxy.DestinationGateways.InitWatch(uid, cancel)
|
|
}
|
|
}
|
|
|
|
snap.ConnectProxy.DestinationsUpstream.ForEachKey(func(uid UpstreamID) bool {
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
snap.ConnectProxy.DestinationsUpstream.CancelWatch(uid)
|
|
}
|
|
return true
|
|
})
|
|
|
|
snap.ConnectProxy.DestinationGateways.ForEachKey(func(uid UpstreamID) bool {
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
snap.ConnectProxy.DestinationGateways.CancelWatch(uid)
|
|
}
|
|
return true
|
|
})
|
|
case strings.HasPrefix(u.CorrelationID, DestinationConfigEntryID):
|
|
resp, ok := u.Result.(*structs.ConfigEntryResponse)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
|
|
pq := strings.TrimPrefix(u.CorrelationID, DestinationConfigEntryID)
|
|
uid := UpstreamIDFromString(pq)
|
|
serviceConf, ok := resp.Entry.(*structs.ServiceConfigEntry)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for service default: %T", resp.Entry.GetName())
|
|
}
|
|
|
|
snap.ConnectProxy.DestinationsUpstream.Set(uid, serviceConf)
|
|
case strings.HasPrefix(u.CorrelationID, DestinationGatewayID):
|
|
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
|
|
pq := strings.TrimPrefix(u.CorrelationID, DestinationGatewayID)
|
|
uid := UpstreamIDFromString(pq)
|
|
snap.ConnectProxy.DestinationGateways.Set(uid, resp.Nodes)
|
|
case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix):
|
|
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
pq := strings.TrimPrefix(u.CorrelationID, "upstream:")
|
|
uid := UpstreamIDFromString(pq)
|
|
snap.ConnectProxy.PreparedQueryEndpoints[uid] = resp.Nodes
|
|
|
|
case strings.HasPrefix(u.CorrelationID, svcChecksWatchIDPrefix):
|
|
resp, ok := u.Result.([]structs.CheckType)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for service checks response: %T, want: []structs.CheckType", u.Result)
|
|
}
|
|
svcID := structs.ServiceIDFromString(strings.TrimPrefix(u.CorrelationID, svcChecksWatchIDPrefix))
|
|
snap.ConnectProxy.WatchedServiceChecks[svcID] = resp
|
|
|
|
default:
|
|
return (*handlerUpstreams)(s).handleUpdateUpstreams(ctx, u, snap)
|
|
}
|
|
return nil
|
|
}
|