mirror of
https://github.com/status-im/consul.git
synced 2025-01-20 18:50:04 +00:00
345191a0df
Adds another datasource for proxycfg.HTTPChecks, for use on server agents. Typically these checks are performed by local client agents and there is no equivalent of this in agentless (where servers configure consul-dataplane proxies). Hence, the data source is mostly a no-op on servers but in the case where the service is present within the local state, it delegates to the cache data source.
672 lines
24 KiB
Go
672 lines
24 KiB
Go
package proxycfg
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
|
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
|
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/proto/pbpeering"
|
|
)
|
|
|
|
type handlerConnectProxy struct {
|
|
handlerState
|
|
}
|
|
|
|
// initialize sets up the watches needed based on current proxy registration
|
|
// state.
|
|
func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, error) {
|
|
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
|
|
snap.ConnectProxy.DiscoveryChain = make(map[UpstreamID]*structs.CompiledDiscoveryChain)
|
|
snap.ConnectProxy.WatchedDiscoveryChains = make(map[UpstreamID]context.CancelFunc)
|
|
snap.ConnectProxy.WatchedUpstreams = make(map[UpstreamID]map[string]context.CancelFunc)
|
|
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes)
|
|
snap.ConnectProxy.UpstreamPeerTrustBundles = watch.NewMap[string, *pbpeering.PeeringTrustBundle]()
|
|
snap.ConnectProxy.WatchedGateways = make(map[UpstreamID]map[string]context.CancelFunc)
|
|
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[UpstreamID]map[string]structs.CheckServiceNodes)
|
|
snap.ConnectProxy.WatchedLocalGWEndpoints = watch.NewMap[string, structs.CheckServiceNodes]()
|
|
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
|
|
snap.ConnectProxy.PreparedQueryEndpoints = make(map[UpstreamID]structs.CheckServiceNodes)
|
|
snap.ConnectProxy.DestinationsUpstream = watch.NewMap[UpstreamID, *structs.ServiceConfigEntry]()
|
|
snap.ConnectProxy.UpstreamConfig = make(map[UpstreamID]*structs.Upstream)
|
|
snap.ConnectProxy.PassthroughUpstreams = make(map[UpstreamID]map[string]map[string]struct{})
|
|
snap.ConnectProxy.PassthroughIndices = make(map[string]indexedTarget)
|
|
snap.ConnectProxy.PeerUpstreamEndpoints = watch.NewMap[UpstreamID, structs.CheckServiceNodes]()
|
|
snap.ConnectProxy.DestinationGateways = watch.NewMap[UpstreamID, structs.CheckServiceNodes]()
|
|
snap.ConnectProxy.PeerUpstreamEndpointsUseHostnames = make(map[UpstreamID]struct{})
|
|
|
|
// Watch for root changes
|
|
err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
Source: *s.source,
|
|
}, rootsWatchID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
|
|
err = s.dataSources.TrustBundleList.Notify(ctx, &cachetype.TrustBundleListRequest{
|
|
Request: &pbpeering.TrustBundleListByServiceRequest{
|
|
ServiceName: s.proxyCfg.DestinationServiceName,
|
|
Namespace: s.proxyID.NamespaceOrDefault(),
|
|
Partition: s.proxyID.PartitionOrDefault(),
|
|
},
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
}, peeringTrustBundlesWatchID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
|
|
// Watch the leaf cert
|
|
err = s.dataSources.LeafCertificate.Notify(ctx, &cachetype.ConnectCALeafRequest{
|
|
Datacenter: s.source.Datacenter,
|
|
Token: s.token,
|
|
Service: s.proxyCfg.DestinationServiceName,
|
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
|
}, leafWatchID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
|
|
// Watch for intention updates
|
|
err = s.dataSources.Intentions.Notify(ctx, &structs.ServiceSpecificRequest{
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
|
ServiceName: s.proxyCfg.DestinationServiceName,
|
|
}, intentionsWatchID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
|
|
// Get information about the entire service mesh.
|
|
err = s.dataSources.ConfigEntry.Notify(ctx, &structs.ConfigEntryQuery{
|
|
Kind: structs.MeshConfig,
|
|
Name: structs.MeshConfigMesh,
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(s.proxyID.PartitionOrDefault()),
|
|
}, meshConfigEntryID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
|
|
// Watch for service check updates
|
|
err = s.dataSources.HTTPChecks.Notify(ctx, &cachetype.ServiceHTTPChecksRequest{
|
|
ServiceID: s.proxyCfg.DestinationServiceID,
|
|
NodeName: s.source.Node,
|
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
|
}, svcChecksWatchIDPrefix+structs.ServiceIDString(s.proxyCfg.DestinationServiceID, &s.proxyID.EnterpriseMeta), s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
|
|
if s.proxyCfg.Mode == structs.ProxyModeTransparent {
|
|
// When in transparent proxy we will infer upstreams from intentions with this source
|
|
err := s.dataSources.IntentionUpstreams.Notify(ctx, &structs.ServiceSpecificRequest{
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
ServiceName: s.proxyCfg.DestinationServiceName,
|
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
|
}, intentionUpstreamsID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
err = s.dataSources.PeeredUpstreams.Notify(ctx, &structs.PartitionSpecificRequest{
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
Datacenter: s.source.Datacenter,
|
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
|
}, peeredUpstreamsID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
// We also infer upstreams from destinations (egress points)
|
|
err = s.dataSources.IntentionUpstreamsDestination.Notify(ctx, &structs.ServiceSpecificRequest{
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
ServiceName: s.proxyCfg.DestinationServiceName,
|
|
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
|
}, intentionUpstreamsDestinationID, s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
}
|
|
|
|
// Watch for updates to service endpoints for all upstreams
|
|
for i := range s.proxyCfg.Upstreams {
|
|
u := s.proxyCfg.Upstreams[i]
|
|
|
|
uid := NewUpstreamID(&u)
|
|
|
|
// Store defaults keyed under wildcard so they can be applied to centrally configured upstreams
|
|
if u.DestinationName == structs.WildcardSpecifier {
|
|
snap.ConnectProxy.UpstreamConfig[uid] = &u
|
|
continue
|
|
}
|
|
|
|
snap.ConnectProxy.UpstreamConfig[uid] = &u
|
|
// This can be true if the upstream is a synthetic entry populated from centralized upstream config.
|
|
// Watches should not be created for them.
|
|
if u.CentrallyConfigured {
|
|
continue
|
|
}
|
|
|
|
dc := s.source.Datacenter
|
|
if u.Datacenter != "" {
|
|
dc = u.Datacenter
|
|
}
|
|
|
|
// Default the partition and namespace to the namespace of this proxy service.
|
|
partition := s.proxyID.PartitionOrDefault()
|
|
if u.DestinationPartition != "" {
|
|
partition = u.DestinationPartition
|
|
}
|
|
ns := s.proxyID.NamespaceOrDefault()
|
|
if u.DestinationNamespace != "" {
|
|
ns = u.DestinationNamespace
|
|
}
|
|
|
|
cfg, err := parseReducedUpstreamConfig(u.Config)
|
|
if err != nil {
|
|
// Don't hard fail on a config typo, just warn. We'll fall back on
|
|
// the plain discovery chain if there is an error so it's safe to
|
|
// continue.
|
|
s.logger.Warn("failed to parse upstream config",
|
|
"upstream", uid.String(),
|
|
"error", err,
|
|
)
|
|
}
|
|
|
|
switch u.DestinationType {
|
|
case structs.UpstreamDestTypePreparedQuery:
|
|
err := s.dataSources.PreparedQuery.Notify(ctx, &structs.PreparedQueryExecuteRequest{
|
|
Datacenter: dc,
|
|
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: defaultPreparedQueryPollInterval},
|
|
QueryIDOrName: u.DestinationName,
|
|
Connect: true,
|
|
Source: *s.source,
|
|
}, "upstream:"+uid.String(), s.ch)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
|
|
case structs.UpstreamDestTypeService:
|
|
fallthrough
|
|
|
|
case "":
|
|
if u.DestinationPeer != "" {
|
|
err := s.setupWatchesForPeeredUpstream(ctx, snap.ConnectProxy, u, dc)
|
|
if err != nil {
|
|
return snap, err
|
|
}
|
|
continue
|
|
}
|
|
|
|
err := s.dataSources.CompiledDiscoveryChain.Notify(ctx, &structs.DiscoveryChainRequest{
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
Name: u.DestinationName,
|
|
EvaluateInDatacenter: dc,
|
|
EvaluateInNamespace: ns,
|
|
EvaluateInPartition: partition,
|
|
OverrideMeshGateway: s.proxyCfg.MeshGateway.OverlayWith(u.MeshGateway),
|
|
OverrideProtocol: cfg.Protocol,
|
|
OverrideConnectTimeout: cfg.ConnectTimeout(),
|
|
}, "discovery-chain:"+uid.String(), s.ch)
|
|
if err != nil {
|
|
return snap, fmt.Errorf("failed to watch discovery chain for %s: %v", uid.String(), err)
|
|
}
|
|
|
|
default:
|
|
return snap, fmt.Errorf("unknown upstream type: %q", u.DestinationType)
|
|
}
|
|
}
|
|
|
|
return snap, nil
|
|
}
|
|
|
|
func (s *handlerConnectProxy) setupWatchesForPeeredUpstream(
|
|
ctx context.Context,
|
|
snapConnectProxy configSnapshotConnectProxy,
|
|
u structs.Upstream,
|
|
dc string,
|
|
) error {
|
|
uid := NewUpstreamID(&u)
|
|
|
|
s.logger.Trace("initializing watch of peered upstream", "upstream", uid)
|
|
|
|
// NOTE: An upstream that points to a peer by definition will
|
|
// only ever watch a single catalog query, so a map key of just
|
|
// "UID" is sufficient to cover the peer data watches here.
|
|
snapConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, nil)
|
|
err := s.dataSources.Health.Notify(ctx, &structs.ServiceSpecificRequest{
|
|
PeerName: uid.Peer,
|
|
Datacenter: dc,
|
|
QueryOptions: structs.QueryOptions{
|
|
Token: s.token,
|
|
},
|
|
ServiceName: u.DestinationName,
|
|
Connect: true,
|
|
Source: *s.source,
|
|
EnterpriseMeta: uid.EnterpriseMeta,
|
|
}, upstreamPeerWatchIDPrefix+uid.String(), s.ch)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Check whether a watch for this peer exists to avoid duplicates.
|
|
if ok := snapConnectProxy.UpstreamPeerTrustBundles.IsWatched(uid.Peer); !ok {
|
|
peerCtx, cancel := context.WithCancel(ctx)
|
|
if err := s.dataSources.TrustBundle.Notify(peerCtx, &cachetype.TrustBundleReadRequest{
|
|
Request: &pbpeering.TrustBundleReadRequest{
|
|
Name: uid.Peer,
|
|
Partition: uid.PartitionOrDefault(),
|
|
},
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
}, peerTrustBundleIDPrefix+uid.Peer, s.ch); err != nil {
|
|
cancel()
|
|
return fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err)
|
|
}
|
|
|
|
snapConnectProxy.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel)
|
|
}
|
|
|
|
// If a peered upstream is set to local mesh gw mode,
|
|
// set up a watch for them.
|
|
if u.MeshGateway.Mode == structs.MeshGatewayModeLocal {
|
|
gk := GatewayKey{
|
|
Partition: s.source.NodePartitionOrDefault(),
|
|
Datacenter: s.source.Datacenter,
|
|
}
|
|
if !snapConnectProxy.WatchedLocalGWEndpoints.IsWatched(gk.String()) {
|
|
opts := gatewayWatchOpts{
|
|
internalServiceDump: s.dataSources.InternalServiceDump,
|
|
notifyCh: s.ch,
|
|
source: *s.source,
|
|
token: s.token,
|
|
key: gk,
|
|
}
|
|
if err := watchMeshGateway(ctx, opts); err != nil {
|
|
return fmt.Errorf("error while watching for local mesh gateway: %w", err)
|
|
}
|
|
snapConnectProxy.WatchedLocalGWEndpoints.InitWatch(gk.String(), nil)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u UpdateEvent, snap *ConfigSnapshot) error {
|
|
if u.Err != nil {
|
|
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
|
}
|
|
|
|
switch {
|
|
case u.CorrelationID == rootsWatchID:
|
|
roots, ok := u.Result.(*structs.IndexedCARoots)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
snap.Roots = roots
|
|
|
|
case u.CorrelationID == peeringTrustBundlesWatchID:
|
|
resp, ok := u.Result.(*pbpeering.TrustBundleListByServiceResponse)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
if len(resp.Bundles) > 0 {
|
|
snap.ConnectProxy.InboundPeerTrustBundles = resp.Bundles
|
|
}
|
|
snap.ConnectProxy.InboundPeerTrustBundlesSet = true
|
|
|
|
case u.CorrelationID == intentionsWatchID:
|
|
resp, ok := u.Result.(structs.Intentions)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
snap.ConnectProxy.Intentions = resp
|
|
snap.ConnectProxy.IntentionsSet = true
|
|
|
|
case u.CorrelationID == peeredUpstreamsID:
|
|
resp, ok := u.Result.(*structs.IndexedPeeredServiceList)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response %T", u.Result)
|
|
}
|
|
|
|
seenUpstreams := make(map[UpstreamID]struct{})
|
|
for _, psn := range resp.Services {
|
|
uid := NewUpstreamIDFromPeeredServiceName(psn)
|
|
|
|
if _, ok := seenUpstreams[uid]; ok {
|
|
continue
|
|
}
|
|
seenUpstreams[uid] = struct{}{}
|
|
|
|
s.logger.Trace("initializing watch of peered upstream", "upstream", uid)
|
|
|
|
hctx, hcancel := context.WithCancel(ctx)
|
|
err := s.dataSources.Health.Notify(hctx, &structs.ServiceSpecificRequest{
|
|
PeerName: uid.Peer,
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{
|
|
Token: s.token,
|
|
},
|
|
ServiceName: psn.ServiceName.Name,
|
|
Connect: true,
|
|
// Note that Identifier doesn't type-prefix for service any more as it's
|
|
// the default and makes metrics and other things much cleaner. It's
|
|
// simpler for us if we have the type to make things unambiguous.
|
|
Source: *s.source,
|
|
EnterpriseMeta: uid.EnterpriseMeta,
|
|
}, upstreamPeerWatchIDPrefix+uid.String(), s.ch)
|
|
if err != nil {
|
|
hcancel()
|
|
return fmt.Errorf("failed to watch health for %s: %v", uid, err)
|
|
}
|
|
snap.ConnectProxy.PeerUpstreamEndpoints.InitWatch(uid, hcancel)
|
|
|
|
// Check whether a watch for this peer exists to avoid duplicates.
|
|
if ok := snap.ConnectProxy.UpstreamPeerTrustBundles.IsWatched(uid.Peer); !ok {
|
|
peerCtx, cancel := context.WithCancel(ctx)
|
|
if err := s.dataSources.TrustBundle.Notify(peerCtx, &cachetype.TrustBundleReadRequest{
|
|
Request: &pbpeering.TrustBundleReadRequest{
|
|
Name: uid.Peer,
|
|
Partition: uid.PartitionOrDefault(),
|
|
},
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
}, peerTrustBundleIDPrefix+uid.Peer, s.ch); err != nil {
|
|
cancel()
|
|
return fmt.Errorf("error while watching trust bundle for peer %q: %w", uid.Peer, err)
|
|
}
|
|
|
|
snap.ConnectProxy.UpstreamPeerTrustBundles.InitWatch(uid.Peer, cancel)
|
|
}
|
|
}
|
|
snap.ConnectProxy.PeeredUpstreams = seenUpstreams
|
|
|
|
//
|
|
// Clean up data
|
|
//
|
|
|
|
peeredChainTargets := make(map[UpstreamID]struct{})
|
|
for _, discoChain := range snap.ConnectProxy.DiscoveryChain {
|
|
for _, target := range discoChain.Targets {
|
|
if target.Peer == "" {
|
|
continue
|
|
}
|
|
uid := NewUpstreamIDFromTargetID(target.ID)
|
|
peeredChainTargets[uid] = struct{}{}
|
|
}
|
|
}
|
|
|
|
validPeerNames := make(map[string]struct{})
|
|
|
|
// Iterate through all known endpoints and remove references to upstream IDs that weren't in the update
|
|
snap.ConnectProxy.PeerUpstreamEndpoints.ForEachKey(func(uid UpstreamID) bool {
|
|
// Peered upstream is explicitly defined in upstream config
|
|
if _, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok {
|
|
validPeerNames[uid.Peer] = struct{}{}
|
|
return true
|
|
}
|
|
// Peered upstream came from dynamic source of imported services
|
|
if _, ok := seenUpstreams[uid]; ok {
|
|
validPeerNames[uid.Peer] = struct{}{}
|
|
return true
|
|
}
|
|
// Peered upstream came from a discovery chain target
|
|
if _, ok := peeredChainTargets[uid]; ok {
|
|
validPeerNames[uid.Peer] = struct{}{}
|
|
return true
|
|
}
|
|
snap.ConnectProxy.PeerUpstreamEndpoints.CancelWatch(uid)
|
|
return true
|
|
})
|
|
|
|
// Iterate through all known trust bundles and remove references to any unseen peer names
|
|
snap.ConnectProxy.UpstreamPeerTrustBundles.ForEachKey(func(peerName PeerName) bool {
|
|
if _, ok := validPeerNames[peerName]; !ok {
|
|
snap.ConnectProxy.UpstreamPeerTrustBundles.CancelWatch(peerName)
|
|
}
|
|
return true
|
|
})
|
|
|
|
case u.CorrelationID == intentionUpstreamsID:
|
|
resp, ok := u.Result.(*structs.IndexedServiceList)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response %T", u.Result)
|
|
}
|
|
|
|
seenUpstreams := make(map[UpstreamID]struct{})
|
|
for _, svc := range resp.Services {
|
|
uid := NewUpstreamIDFromServiceName(svc)
|
|
|
|
seenUpstreams[uid] = struct{}{}
|
|
|
|
cfgMap := make(map[string]interface{})
|
|
u, ok := snap.ConnectProxy.UpstreamConfig[uid]
|
|
if ok {
|
|
cfgMap = u.Config
|
|
} else {
|
|
// Use the centralized upstream defaults if they exist and there isn't specific configuration for this upstream
|
|
// This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled
|
|
// by the ResolveServiceConfig endpoint.
|
|
wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, s.proxyID.WithWildcardNamespace())
|
|
wildcardUID := NewUpstreamIDFromServiceID(wildcardSID)
|
|
defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardUID]
|
|
if ok {
|
|
u = defaults
|
|
cfgMap = defaults.Config
|
|
snap.ConnectProxy.UpstreamConfig[uid] = defaults
|
|
}
|
|
}
|
|
|
|
cfg, err := parseReducedUpstreamConfig(cfgMap)
|
|
if err != nil {
|
|
// Don't hard fail on a config typo, just warn. We'll fall back on
|
|
// the plain discovery chain if there is an error so it's safe to
|
|
// continue.
|
|
s.logger.Warn("failed to parse upstream config",
|
|
"upstream", uid,
|
|
"error", err,
|
|
)
|
|
}
|
|
|
|
meshGateway := s.proxyCfg.MeshGateway
|
|
if u != nil {
|
|
meshGateway = meshGateway.OverlayWith(u.MeshGateway)
|
|
}
|
|
watchOpts := discoveryChainWatchOpts{
|
|
id: NewUpstreamIDFromServiceName(svc),
|
|
name: svc.Name,
|
|
namespace: svc.NamespaceOrDefault(),
|
|
partition: svc.PartitionOrDefault(),
|
|
datacenter: s.source.Datacenter,
|
|
cfg: cfg,
|
|
meshGateway: meshGateway,
|
|
}
|
|
up := &handlerUpstreams{handlerState: s.handlerState}
|
|
err = up.watchDiscoveryChain(ctx, snap, watchOpts)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to watch discovery chain for %s: %v", uid, err)
|
|
}
|
|
}
|
|
snap.ConnectProxy.IntentionUpstreams = seenUpstreams
|
|
|
|
// Clean up data from services that were not in the update
|
|
for uid, targets := range snap.ConnectProxy.WatchedUpstreams {
|
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && !upstream.CentrallyConfigured {
|
|
continue
|
|
}
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
for targetID, cancelFn := range targets {
|
|
cancelFn()
|
|
|
|
targetUID := NewUpstreamIDFromTargetID(targetID)
|
|
if targetUID.Peer != "" {
|
|
snap.ConnectProxy.PeerUpstreamEndpoints.CancelWatch(targetUID)
|
|
snap.ConnectProxy.UpstreamPeerTrustBundles.CancelWatch(targetUID.Peer)
|
|
}
|
|
}
|
|
delete(snap.ConnectProxy.WatchedUpstreams, uid)
|
|
}
|
|
}
|
|
for uid := range snap.ConnectProxy.WatchedUpstreamEndpoints {
|
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && !upstream.CentrallyConfigured {
|
|
continue
|
|
}
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
delete(snap.ConnectProxy.WatchedUpstreamEndpoints, uid)
|
|
}
|
|
}
|
|
for uid, cancelMap := range snap.ConnectProxy.WatchedGateways {
|
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && !upstream.CentrallyConfigured {
|
|
continue
|
|
}
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
for _, cancelFn := range cancelMap {
|
|
cancelFn()
|
|
}
|
|
delete(snap.ConnectProxy.WatchedGateways, uid)
|
|
}
|
|
}
|
|
for uid := range snap.ConnectProxy.WatchedGatewayEndpoints {
|
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && !upstream.CentrallyConfigured {
|
|
continue
|
|
}
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
delete(snap.ConnectProxy.WatchedGatewayEndpoints, uid)
|
|
}
|
|
}
|
|
for uid, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains {
|
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && !upstream.CentrallyConfigured {
|
|
continue
|
|
}
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
cancelFn()
|
|
delete(snap.ConnectProxy.WatchedDiscoveryChains, uid)
|
|
}
|
|
}
|
|
for uid := range snap.ConnectProxy.PassthroughUpstreams {
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
delete(snap.ConnectProxy.PassthroughUpstreams, uid)
|
|
}
|
|
}
|
|
for addr, indexed := range snap.ConnectProxy.PassthroughIndices {
|
|
if _, ok := seenUpstreams[indexed.upstreamID]; !ok {
|
|
delete(snap.ConnectProxy.PassthroughIndices, addr)
|
|
}
|
|
}
|
|
|
|
// These entries are intentionally handled separately from the WatchedDiscoveryChains above.
|
|
// There have been situations where a discovery watch was cancelled, then fired.
|
|
// That update event then re-populated the DiscoveryChain map entry, which wouldn't get cleaned up
|
|
// since there was no known watch for it.
|
|
for uid := range snap.ConnectProxy.DiscoveryChain {
|
|
if upstream, ok := snap.ConnectProxy.UpstreamConfig[uid]; ok && !upstream.CentrallyConfigured {
|
|
continue
|
|
}
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
delete(snap.ConnectProxy.DiscoveryChain, uid)
|
|
}
|
|
}
|
|
case u.CorrelationID == intentionUpstreamsDestinationID:
|
|
resp, ok := u.Result.(*structs.IndexedServiceList)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response %T", u.Result)
|
|
}
|
|
seenUpstreams := make(map[UpstreamID]struct{})
|
|
for _, svc := range resp.Services {
|
|
uid := NewUpstreamIDFromServiceName(svc)
|
|
seenUpstreams[uid] = struct{}{}
|
|
{
|
|
childCtx, cancel := context.WithCancel(ctx)
|
|
err := s.dataSources.ConfigEntry.Notify(childCtx, &structs.ConfigEntryQuery{
|
|
Kind: structs.ServiceDefaults,
|
|
Name: svc.Name,
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
EnterpriseMeta: svc.EnterpriseMeta,
|
|
}, DestinationConfigEntryID+svc.String(), s.ch)
|
|
if err != nil {
|
|
cancel()
|
|
return err
|
|
}
|
|
snap.ConnectProxy.DestinationsUpstream.InitWatch(uid, cancel)
|
|
}
|
|
{
|
|
childCtx, cancel := context.WithCancel(ctx)
|
|
err := s.dataSources.ServiceGateways.Notify(childCtx, &structs.ServiceSpecificRequest{
|
|
ServiceName: svc.Name,
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
EnterpriseMeta: svc.EnterpriseMeta,
|
|
ServiceKind: structs.ServiceKindTerminatingGateway,
|
|
}, DestinationGatewayID+svc.String(), s.ch)
|
|
if err != nil {
|
|
cancel()
|
|
return err
|
|
}
|
|
snap.ConnectProxy.DestinationGateways.InitWatch(uid, cancel)
|
|
}
|
|
}
|
|
|
|
snap.ConnectProxy.DestinationsUpstream.ForEachKey(func(uid UpstreamID) bool {
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
snap.ConnectProxy.DestinationsUpstream.CancelWatch(uid)
|
|
}
|
|
return true
|
|
})
|
|
|
|
snap.ConnectProxy.DestinationGateways.ForEachKey(func(uid UpstreamID) bool {
|
|
if _, ok := seenUpstreams[uid]; !ok {
|
|
snap.ConnectProxy.DestinationGateways.CancelWatch(uid)
|
|
}
|
|
return true
|
|
})
|
|
case strings.HasPrefix(u.CorrelationID, DestinationConfigEntryID):
|
|
resp, ok := u.Result.(*structs.ConfigEntryResponse)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
|
|
pq := strings.TrimPrefix(u.CorrelationID, DestinationConfigEntryID)
|
|
uid := UpstreamIDFromString(pq)
|
|
serviceConf, ok := resp.Entry.(*structs.ServiceConfigEntry)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for service default: %T", resp.Entry.GetName())
|
|
}
|
|
|
|
snap.ConnectProxy.DestinationsUpstream.Set(uid, serviceConf)
|
|
case strings.HasPrefix(u.CorrelationID, DestinationGatewayID):
|
|
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
|
|
pq := strings.TrimPrefix(u.CorrelationID, DestinationGatewayID)
|
|
uid := UpstreamIDFromString(pq)
|
|
snap.ConnectProxy.DestinationGateways.Set(uid, resp.Nodes)
|
|
case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix):
|
|
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
pq := strings.TrimPrefix(u.CorrelationID, "upstream:")
|
|
uid := UpstreamIDFromString(pq)
|
|
snap.ConnectProxy.PreparedQueryEndpoints[uid] = resp.Nodes
|
|
|
|
case strings.HasPrefix(u.CorrelationID, svcChecksWatchIDPrefix):
|
|
resp, ok := u.Result.([]structs.CheckType)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for service checks response: %T, want: []structs.CheckType", u.Result)
|
|
}
|
|
svcID := structs.ServiceIDFromString(strings.TrimPrefix(u.CorrelationID, svcChecksWatchIDPrefix))
|
|
snap.ConnectProxy.WatchedServiceChecks[svcID] = resp
|
|
|
|
default:
|
|
return (*handlerUpstreams)(s).handleUpdateUpstreams(ctx, u, snap)
|
|
}
|
|
return nil
|
|
}
|