mirror of
https://github.com/status-im/consul.git
synced 2025-01-24 12:40:17 +00:00
Merge pull request #9490 from hashicorp/dnephin/proxycfg-state-3
proxycfg: move each handler into a seprate file
This commit is contained in:
commit
4c37c225a5
351
agent/proxycfg/connect_proxy.go
Normal file
351
agent/proxycfg/connect_proxy.go
Normal file
@ -0,0 +1,351 @@
|
||||
package proxycfg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
type handlerConnectProxy struct {
|
||||
handlerState
|
||||
}
|
||||
|
||||
// initialize sets up the watches needed based on current proxy registration
|
||||
// state.
|
||||
func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, error) {
|
||||
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
|
||||
snap.ConnectProxy.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain)
|
||||
snap.ConnectProxy.WatchedDiscoveryChains = make(map[string]context.CancelFunc)
|
||||
snap.ConnectProxy.WatchedUpstreams = make(map[string]map[string]context.CancelFunc)
|
||||
snap.ConnectProxy.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
|
||||
snap.ConnectProxy.WatchedGateways = make(map[string]map[string]context.CancelFunc)
|
||||
snap.ConnectProxy.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
|
||||
snap.ConnectProxy.WatchedServiceChecks = make(map[structs.ServiceID][]structs.CheckType)
|
||||
snap.ConnectProxy.PreparedQueryEndpoints = make(map[string]structs.CheckServiceNodes)
|
||||
snap.ConnectProxy.UpstreamConfig = make(map[string]*structs.Upstream)
|
||||
snap.ConnectProxy.PassthroughUpstreams = make(map[string]ServicePassthroughAddrs)
|
||||
|
||||
// Watch for root changes
|
||||
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Source: *s.source,
|
||||
}, rootsWatchID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
// Watch the leaf cert
|
||||
err = s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
Token: s.token,
|
||||
Service: s.proxyCfg.DestinationServiceName,
|
||||
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
||||
}, leafWatchID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
// Watch for intention updates
|
||||
err = s.cache.Notify(ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Match: &structs.IntentionQueryMatch{
|
||||
Type: structs.IntentionMatchDestination,
|
||||
Entries: []structs.IntentionMatchEntry{
|
||||
{
|
||||
Namespace: s.proxyID.NamespaceOrDefault(),
|
||||
Name: s.proxyCfg.DestinationServiceName,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, intentionsWatchID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
// Watch for service check updates
|
||||
err = s.cache.Notify(ctx, cachetype.ServiceHTTPChecksName, &cachetype.ServiceHTTPChecksRequest{
|
||||
ServiceID: s.proxyCfg.DestinationServiceID,
|
||||
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
||||
}, svcChecksWatchIDPrefix+structs.ServiceIDString(s.proxyCfg.DestinationServiceID, &s.proxyID.EnterpriseMeta), s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
// default the namespace to the namespace of this proxy service
|
||||
currentNamespace := s.proxyID.NamespaceOrDefault()
|
||||
|
||||
if s.proxyCfg.Mode == structs.ProxyModeTransparent {
|
||||
// When in transparent proxy we will infer upstreams from intentions with this source
|
||||
err := s.cache.Notify(ctx, cachetype.IntentionUpstreamsName, &structs.ServiceSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceName: s.proxyCfg.DestinationServiceName,
|
||||
EnterpriseMeta: structs.NewEnterpriseMeta(s.proxyID.NamespaceOrEmpty()),
|
||||
}, intentionUpstreamsID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
|
||||
Kind: structs.MeshConfig,
|
||||
Name: structs.MeshConfigMesh,
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
}, meshConfigEntryID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
}
|
||||
|
||||
// Watch for updates to service endpoints for all upstreams
|
||||
for i := range s.proxyCfg.Upstreams {
|
||||
u := s.proxyCfg.Upstreams[i]
|
||||
|
||||
// Store defaults keyed under wildcard so they can be applied to centrally configured upstreams
|
||||
if u.DestinationName == structs.WildcardSpecifier {
|
||||
snap.ConnectProxy.UpstreamConfig[u.DestinationID().String()] = &u
|
||||
continue
|
||||
}
|
||||
|
||||
// This can be true if the upstream is a synthetic entry populated from centralized upstream config.
|
||||
// Watches should not be created for them.
|
||||
if u.CentrallyConfigured {
|
||||
continue
|
||||
}
|
||||
snap.ConnectProxy.UpstreamConfig[u.Identifier()] = &u
|
||||
|
||||
dc := s.source.Datacenter
|
||||
if u.Datacenter != "" {
|
||||
dc = u.Datacenter
|
||||
}
|
||||
if s.proxyCfg.Mode == structs.ProxyModeTransparent && (dc == "" || dc == s.source.Datacenter) {
|
||||
// In transparent proxy mode, watches for upstreams in the local DC are handled by the IntentionUpstreams watch.
|
||||
continue
|
||||
}
|
||||
|
||||
ns := currentNamespace
|
||||
if u.DestinationNamespace != "" {
|
||||
ns = u.DestinationNamespace
|
||||
}
|
||||
|
||||
cfg, err := parseReducedUpstreamConfig(u.Config)
|
||||
if err != nil {
|
||||
// Don't hard fail on a config typo, just warn. We'll fall back on
|
||||
// the plain discovery chain if there is an error so it's safe to
|
||||
// continue.
|
||||
s.logger.Warn("failed to parse upstream config",
|
||||
"upstream", u.Identifier(),
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
|
||||
switch u.DestinationType {
|
||||
case structs.UpstreamDestTypePreparedQuery:
|
||||
err = s.cache.Notify(ctx, cachetype.PreparedQueryName, &structs.PreparedQueryExecuteRequest{
|
||||
Datacenter: dc,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: defaultPreparedQueryPollInterval},
|
||||
QueryIDOrName: u.DestinationName,
|
||||
Connect: true,
|
||||
Source: *s.source,
|
||||
}, "upstream:"+u.Identifier(), s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
case structs.UpstreamDestTypeService:
|
||||
fallthrough
|
||||
|
||||
case "": // Treat unset as the default Service type
|
||||
err = s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Name: u.DestinationName,
|
||||
EvaluateInDatacenter: dc,
|
||||
EvaluateInNamespace: ns,
|
||||
OverrideMeshGateway: s.proxyCfg.MeshGateway.OverlayWith(u.MeshGateway),
|
||||
OverrideProtocol: cfg.Protocol,
|
||||
OverrideConnectTimeout: cfg.ConnectTimeout(),
|
||||
}, "discovery-chain:"+u.Identifier(), s.ch)
|
||||
if err != nil {
|
||||
return snap, fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
|
||||
}
|
||||
|
||||
default:
|
||||
return snap, fmt.Errorf("unknown upstream type: %q", u.DestinationType)
|
||||
}
|
||||
}
|
||||
|
||||
return snap, nil
|
||||
}
|
||||
|
||||
func (s *handlerConnectProxy) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||
if u.Err != nil {
|
||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||
}
|
||||
|
||||
switch {
|
||||
case u.CorrelationID == rootsWatchID:
|
||||
roots, ok := u.Result.(*structs.IndexedCARoots)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
snap.Roots = roots
|
||||
case u.CorrelationID == intentionsWatchID:
|
||||
resp, ok := u.Result.(*structs.IndexedIntentionMatches)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
if len(resp.Matches) > 0 {
|
||||
// RPC supports matching multiple services at once but we only ever
|
||||
// query with the one service we represent currently so just pick
|
||||
// the one result set up.
|
||||
snap.ConnectProxy.Intentions = resp.Matches[0]
|
||||
}
|
||||
snap.ConnectProxy.IntentionsSet = true
|
||||
|
||||
case u.CorrelationID == intentionUpstreamsID:
|
||||
resp, ok := u.Result.(*structs.IndexedServiceList)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response %T", u.Result)
|
||||
}
|
||||
|
||||
seenServices := make(map[string]struct{})
|
||||
for _, svc := range resp.Services {
|
||||
seenServices[svc.String()] = struct{}{}
|
||||
|
||||
cfgMap := make(map[string]interface{})
|
||||
u, ok := snap.ConnectProxy.UpstreamConfig[svc.String()]
|
||||
if ok {
|
||||
cfgMap = u.Config
|
||||
} else {
|
||||
// Use the centralized upstream defaults if they exist and there isn't specific configuration for this upstream
|
||||
// This is only relevant to upstreams from intentions because for explicit upstreams the defaulting is handled
|
||||
// by the ResolveServiceConfig endpoint.
|
||||
wildcardSID := structs.NewServiceID(structs.WildcardSpecifier, structs.WildcardEnterpriseMeta())
|
||||
defaults, ok := snap.ConnectProxy.UpstreamConfig[wildcardSID.String()]
|
||||
if ok {
|
||||
u = defaults
|
||||
cfgMap = defaults.Config
|
||||
snap.ConnectProxy.UpstreamConfig[svc.String()] = defaults
|
||||
}
|
||||
}
|
||||
|
||||
cfg, err := parseReducedUpstreamConfig(cfgMap)
|
||||
if err != nil {
|
||||
// Don't hard fail on a config typo, just warn. We'll fall back on
|
||||
// the plain discovery chain if there is an error so it's safe to
|
||||
// continue.
|
||||
s.logger.Warn("failed to parse upstream config",
|
||||
"upstream", u.Identifier(),
|
||||
"error", err,
|
||||
)
|
||||
}
|
||||
|
||||
meshGateway := s.proxyCfg.MeshGateway
|
||||
if u != nil {
|
||||
meshGateway = meshGateway.OverlayWith(u.MeshGateway)
|
||||
}
|
||||
watchOpts := discoveryChainWatchOpts{
|
||||
id: svc.String(),
|
||||
name: svc.Name,
|
||||
namespace: svc.NamespaceOrDefault(),
|
||||
datacenter: s.source.Datacenter,
|
||||
cfg: cfg,
|
||||
meshGateway: meshGateway,
|
||||
}
|
||||
up := &handlerUpstreams{handlerState: s.handlerState}
|
||||
err = up.watchDiscoveryChain(ctx, snap, watchOpts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to watch discovery chain for %s: %v", svc.String(), err)
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up data from services that were not in the update
|
||||
for sn := range snap.ConnectProxy.WatchedUpstreams {
|
||||
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
|
||||
continue
|
||||
}
|
||||
if _, ok := seenServices[sn]; !ok {
|
||||
delete(snap.ConnectProxy.WatchedUpstreams, sn)
|
||||
}
|
||||
}
|
||||
for sn := range snap.ConnectProxy.WatchedUpstreamEndpoints {
|
||||
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
|
||||
continue
|
||||
}
|
||||
if _, ok := seenServices[sn]; !ok {
|
||||
delete(snap.ConnectProxy.WatchedUpstreamEndpoints, sn)
|
||||
}
|
||||
}
|
||||
for sn := range snap.ConnectProxy.WatchedGateways {
|
||||
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
|
||||
continue
|
||||
}
|
||||
if _, ok := seenServices[sn]; !ok {
|
||||
delete(snap.ConnectProxy.WatchedGateways, sn)
|
||||
}
|
||||
}
|
||||
for sn := range snap.ConnectProxy.WatchedGatewayEndpoints {
|
||||
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
|
||||
continue
|
||||
}
|
||||
if _, ok := seenServices[sn]; !ok {
|
||||
delete(snap.ConnectProxy.WatchedGatewayEndpoints, sn)
|
||||
}
|
||||
}
|
||||
for sn, cancelFn := range snap.ConnectProxy.WatchedDiscoveryChains {
|
||||
if upstream, ok := snap.ConnectProxy.UpstreamConfig[sn]; ok && upstream.Datacenter != "" && upstream.Datacenter != s.source.Datacenter {
|
||||
continue
|
||||
}
|
||||
if _, ok := seenServices[sn]; !ok {
|
||||
cancelFn()
|
||||
delete(snap.ConnectProxy.WatchedDiscoveryChains, sn)
|
||||
delete(snap.ConnectProxy.DiscoveryChain, sn)
|
||||
}
|
||||
}
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, "upstream:"+preparedQueryIDPrefix):
|
||||
resp, ok := u.Result.(*structs.PreparedQueryExecuteResponse)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
pq := strings.TrimPrefix(u.CorrelationID, "upstream:")
|
||||
snap.ConnectProxy.PreparedQueryEndpoints[pq] = resp.Nodes
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, svcChecksWatchIDPrefix):
|
||||
resp, ok := u.Result.([]structs.CheckType)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for service checks response: %T, want: []structs.CheckType", u.Result)
|
||||
}
|
||||
svcID := structs.ServiceIDFromString(strings.TrimPrefix(u.CorrelationID, svcChecksWatchIDPrefix))
|
||||
snap.ConnectProxy.WatchedServiceChecks[svcID] = resp
|
||||
|
||||
case u.CorrelationID == meshConfigEntryID:
|
||||
resp, ok := u.Result.(*structs.ConfigEntryResponse)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
if resp.Entry != nil {
|
||||
meshConf, ok := resp.Entry.(*structs.MeshConfigEntry)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
|
||||
}
|
||||
snap.ConnectProxy.MeshConfig = meshConf
|
||||
} else {
|
||||
snap.ConnectProxy.MeshConfig = nil
|
||||
}
|
||||
snap.ConnectProxy.MeshConfigSet = true
|
||||
|
||||
default:
|
||||
return (*handlerUpstreams)(s).handleUpdateUpstreams(ctx, u, snap)
|
||||
}
|
||||
return nil
|
||||
}
|
222
agent/proxycfg/ingress_gateway.go
Normal file
222
agent/proxycfg/ingress_gateway.go
Normal file
@ -0,0 +1,222 @@
|
||||
package proxycfg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
type handlerIngressGateway struct {
|
||||
handlerState
|
||||
}
|
||||
|
||||
func (s *handlerIngressGateway) initialize(ctx context.Context) (ConfigSnapshot, error) {
|
||||
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
|
||||
// Watch for root changes
|
||||
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Source: *s.source,
|
||||
}, rootsWatchID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
// Watch this ingress gateway's config entry
|
||||
err = s.cache.Notify(ctx, cachetype.ConfigEntryName, &structs.ConfigEntryQuery{
|
||||
Kind: structs.IngressGateway,
|
||||
Name: s.service,
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
||||
}, gatewayConfigWatchID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
// Watch the ingress-gateway's list of upstreams
|
||||
err = s.cache.Notify(ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceName: s.service,
|
||||
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
||||
}, gatewayServicesWatchID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
snap.IngressGateway.WatchedDiscoveryChains = make(map[string]context.CancelFunc)
|
||||
snap.IngressGateway.DiscoveryChain = make(map[string]*structs.CompiledDiscoveryChain)
|
||||
snap.IngressGateway.WatchedUpstreams = make(map[string]map[string]context.CancelFunc)
|
||||
snap.IngressGateway.WatchedUpstreamEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
|
||||
snap.IngressGateway.WatchedGateways = make(map[string]map[string]context.CancelFunc)
|
||||
snap.IngressGateway.WatchedGatewayEndpoints = make(map[string]map[string]structs.CheckServiceNodes)
|
||||
return snap, nil
|
||||
}
|
||||
|
||||
func (s *handlerIngressGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||
if u.Err != nil {
|
||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||
}
|
||||
|
||||
switch {
|
||||
case u.CorrelationID == rootsWatchID:
|
||||
roots, ok := u.Result.(*structs.IndexedCARoots)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
snap.Roots = roots
|
||||
case u.CorrelationID == gatewayConfigWatchID:
|
||||
resp, ok := u.Result.(*structs.ConfigEntryResponse)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
gatewayConf, ok := resp.Entry.(*structs.IngressGatewayConfigEntry)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
|
||||
}
|
||||
|
||||
snap.IngressGateway.TLSEnabled = gatewayConf.TLS.Enabled
|
||||
snap.IngressGateway.TLSSet = true
|
||||
|
||||
if err := s.watchIngressLeafCert(ctx, snap); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case u.CorrelationID == gatewayServicesWatchID:
|
||||
services, ok := u.Result.(*structs.IndexedGatewayServices)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
// Update our upstreams and watches.
|
||||
var hosts []string
|
||||
watchedSvcs := make(map[string]struct{})
|
||||
upstreamsMap := make(map[IngressListenerKey]structs.Upstreams)
|
||||
for _, service := range services.Services {
|
||||
u := makeUpstream(service)
|
||||
|
||||
watchOpts := discoveryChainWatchOpts{
|
||||
id: u.Identifier(),
|
||||
name: u.DestinationName,
|
||||
namespace: u.DestinationNamespace,
|
||||
datacenter: s.source.Datacenter,
|
||||
}
|
||||
up := &handlerUpstreams{handlerState: s.handlerState}
|
||||
err := up.watchDiscoveryChain(ctx, snap, watchOpts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to watch discovery chain for %s: %v", u.Identifier(), err)
|
||||
}
|
||||
watchedSvcs[u.Identifier()] = struct{}{}
|
||||
|
||||
hosts = append(hosts, service.Hosts...)
|
||||
|
||||
id := IngressListenerKey{Protocol: service.Protocol, Port: service.Port}
|
||||
upstreamsMap[id] = append(upstreamsMap[id], u)
|
||||
}
|
||||
|
||||
snap.IngressGateway.Upstreams = upstreamsMap
|
||||
snap.IngressGateway.Hosts = hosts
|
||||
snap.IngressGateway.HostsSet = true
|
||||
|
||||
for id, cancelFn := range snap.IngressGateway.WatchedDiscoveryChains {
|
||||
if _, ok := watchedSvcs[id]; !ok {
|
||||
cancelFn()
|
||||
delete(snap.IngressGateway.WatchedDiscoveryChains, id)
|
||||
}
|
||||
}
|
||||
|
||||
if err := s.watchIngressLeafCert(ctx, snap); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
default:
|
||||
return (*handlerUpstreams)(s).handleUpdateUpstreams(ctx, u, snap)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Note: Ingress gateways are always bound to ports and never unix sockets.
|
||||
// This means LocalBindPort is the only possibility
|
||||
func makeUpstream(g *structs.GatewayService) structs.Upstream {
|
||||
upstream := structs.Upstream{
|
||||
DestinationName: g.Service.Name,
|
||||
DestinationNamespace: g.Service.NamespaceOrDefault(),
|
||||
LocalBindPort: g.Port,
|
||||
IngressHosts: g.Hosts,
|
||||
// Pass the protocol that was configured on the ingress listener in order
|
||||
// to force that protocol on the Envoy listener.
|
||||
Config: map[string]interface{}{
|
||||
"protocol": g.Protocol,
|
||||
},
|
||||
}
|
||||
|
||||
return upstream
|
||||
}
|
||||
|
||||
func (s *handlerIngressGateway) watchIngressLeafCert(ctx context.Context, snap *ConfigSnapshot) error {
|
||||
if !snap.IngressGateway.TLSSet || !snap.IngressGateway.HostsSet {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Watch the leaf cert
|
||||
if snap.IngressGateway.LeafCertWatchCancel != nil {
|
||||
snap.IngressGateway.LeafCertWatchCancel()
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
Token: s.token,
|
||||
Service: s.service,
|
||||
DNSSAN: s.generateIngressDNSSANs(snap),
|
||||
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
||||
}, leafWatchID, s.ch)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
snap.IngressGateway.LeafCertWatchCancel = cancel
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *handlerIngressGateway) generateIngressDNSSANs(snap *ConfigSnapshot) []string {
|
||||
// Update our leaf cert watch with wildcard entries for our DNS domains as well as any
|
||||
// configured custom hostnames from the service.
|
||||
if !snap.IngressGateway.TLSEnabled {
|
||||
return nil
|
||||
}
|
||||
|
||||
var dnsNames []string
|
||||
namespaces := make(map[string]struct{})
|
||||
for _, upstreams := range snap.IngressGateway.Upstreams {
|
||||
for _, u := range upstreams {
|
||||
namespaces[u.DestinationNamespace] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
for ns := range namespaces {
|
||||
// The default namespace is special cased in DNS resolution, so special
|
||||
// case it here.
|
||||
if ns == structs.IntentionDefaultNamespace {
|
||||
ns = ""
|
||||
} else {
|
||||
ns = ns + "."
|
||||
}
|
||||
|
||||
dnsNames = append(dnsNames, fmt.Sprintf("*.ingress.%s%s", ns, s.dnsConfig.Domain))
|
||||
dnsNames = append(dnsNames, fmt.Sprintf("*.ingress.%s%s.%s", ns, s.source.Datacenter, s.dnsConfig.Domain))
|
||||
if s.dnsConfig.AltDomain != "" {
|
||||
dnsNames = append(dnsNames, fmt.Sprintf("*.ingress.%s%s", ns, s.dnsConfig.AltDomain))
|
||||
dnsNames = append(dnsNames, fmt.Sprintf("*.ingress.%s%s.%s", ns, s.source.Datacenter, s.dnsConfig.AltDomain))
|
||||
}
|
||||
}
|
||||
|
||||
dnsNames = append(dnsNames, snap.IngressGateway.Hosts...)
|
||||
|
||||
return dnsNames
|
||||
}
|
308
agent/proxycfg/mesh_gateway.go
Normal file
308
agent/proxycfg/mesh_gateway.go
Normal file
@ -0,0 +1,308 @@
|
||||
package proxycfg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/logging"
|
||||
)
|
||||
|
||||
type handlerMeshGateway struct {
|
||||
handlerState
|
||||
}
|
||||
|
||||
// initialize sets up the watches needed based on the current mesh gateway registration
|
||||
func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, error) {
|
||||
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
|
||||
// Watch for root changes
|
||||
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Source: *s.source,
|
||||
}, rootsWatchID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
// Watch for all services
|
||||
err = s.cache.Notify(ctx, cachetype.CatalogServiceListName, &structs.DCSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: *structs.WildcardEnterpriseMeta(),
|
||||
}, serviceListWatchID, s.ch)
|
||||
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
if s.meta[structs.MetaWANFederationKey] == "1" {
|
||||
// Conveniently we can just use this service meta attribute in one
|
||||
// place here to set the machinery in motion and leave the conditional
|
||||
// behavior out of the rest of the package.
|
||||
err = s.cache.Notify(ctx, cachetype.FederationStateListMeshGatewaysName, &structs.DCSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Source: *s.source,
|
||||
}, federationStateListGatewaysWatchID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
err = s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceName: structs.ConsulServiceName,
|
||||
}, consulServerListWatchID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
}
|
||||
|
||||
// Eventually we will have to watch connect enable instances for each service as well as the
|
||||
// destination services themselves but those notifications will be setup later. However we
|
||||
// cannot setup those watches until we know what the services are. from the service list
|
||||
// watch above
|
||||
|
||||
err = s.cache.Notify(ctx, cachetype.CatalogDatacentersName, &structs.DatacentersRequest{
|
||||
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: 30 * time.Second},
|
||||
}, datacentersWatchID, s.ch)
|
||||
if err != nil {
|
||||
return snap, err
|
||||
}
|
||||
|
||||
// Once we start getting notified about the datacenters we will setup watches on the
|
||||
// gateways within those other datacenters. We cannot do that here because we don't
|
||||
// know what they are yet.
|
||||
|
||||
// Watch service-resolvers so we can setup service subset clusters
|
||||
err = s.cache.Notify(ctx, cachetype.ConfigEntriesName, &structs.ConfigEntryQuery{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Kind: structs.ServiceResolver,
|
||||
EnterpriseMeta: *structs.WildcardEnterpriseMeta(),
|
||||
}, serviceResolversWatchID, s.ch)
|
||||
if err != nil {
|
||||
s.logger.Named(logging.MeshGateway).
|
||||
Error("failed to register watch for service-resolver config entries", "error", err)
|
||||
return snap, err
|
||||
}
|
||||
|
||||
snap.MeshGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
|
||||
snap.MeshGateway.WatchedDatacenters = make(map[string]context.CancelFunc)
|
||||
snap.MeshGateway.ServiceGroups = make(map[structs.ServiceName]structs.CheckServiceNodes)
|
||||
snap.MeshGateway.GatewayGroups = make(map[string]structs.CheckServiceNodes)
|
||||
snap.MeshGateway.ServiceResolvers = make(map[structs.ServiceName]*structs.ServiceResolverConfigEntry)
|
||||
snap.MeshGateway.HostnameDatacenters = make(map[string]structs.CheckServiceNodes)
|
||||
// there is no need to initialize the map of service resolvers as we
|
||||
// fully rebuild it every time we get updates
|
||||
return snap, err
|
||||
}
|
||||
|
||||
func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||
if u.Err != nil {
|
||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||
}
|
||||
|
||||
meshLogger := s.logger.Named(logging.MeshGateway)
|
||||
|
||||
switch u.CorrelationID {
|
||||
case rootsWatchID:
|
||||
roots, ok := u.Result.(*structs.IndexedCARoots)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
snap.Roots = roots
|
||||
case federationStateListGatewaysWatchID:
|
||||
dcIndexedNodes, ok := u.Result.(*structs.DatacenterIndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
snap.MeshGateway.FedStateGateways = dcIndexedNodes.DatacenterNodes
|
||||
|
||||
for dc, nodes := range dcIndexedNodes.DatacenterNodes {
|
||||
snap.MeshGateway.HostnameDatacenters[dc] = hostnameEndpoints(
|
||||
s.logger.Named(logging.MeshGateway), snap.Datacenter, nodes)
|
||||
}
|
||||
|
||||
for dc := range snap.MeshGateway.HostnameDatacenters {
|
||||
if _, ok := dcIndexedNodes.DatacenterNodes[dc]; !ok {
|
||||
delete(snap.MeshGateway.HostnameDatacenters, dc)
|
||||
}
|
||||
}
|
||||
|
||||
case serviceListWatchID:
|
||||
services, ok := u.Result.(*structs.IndexedServiceList)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
svcMap := make(map[structs.ServiceName]struct{})
|
||||
for _, svc := range services.Services {
|
||||
// Make sure to add every service to this map, we use it to cancel
|
||||
// watches below.
|
||||
svcMap[svc] = struct{}{}
|
||||
|
||||
if _, ok := snap.MeshGateway.WatchedServices[svc]; !ok {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceName: svc.Name,
|
||||
Connect: true,
|
||||
EnterpriseMeta: svc.EnterpriseMeta,
|
||||
}, fmt.Sprintf("connect-service:%s", svc.String()), s.ch)
|
||||
|
||||
if err != nil {
|
||||
meshLogger.Error("failed to register watch for connect-service",
|
||||
"service", svc.String(),
|
||||
"error", err,
|
||||
)
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
snap.MeshGateway.WatchedServices[svc] = cancel
|
||||
}
|
||||
}
|
||||
|
||||
for sid, cancelFn := range snap.MeshGateway.WatchedServices {
|
||||
if _, ok := svcMap[sid]; !ok {
|
||||
meshLogger.Debug("canceling watch for service", "service", sid.String())
|
||||
// TODO (gateways) Should the sid also be deleted from snap.MeshGateway.ServiceGroups?
|
||||
// Do those endpoints get cleaned up some other way?
|
||||
delete(snap.MeshGateway.WatchedServices, sid)
|
||||
cancelFn()
|
||||
}
|
||||
}
|
||||
|
||||
snap.MeshGateway.WatchedServicesSet = true
|
||||
case datacentersWatchID:
|
||||
datacentersRaw, ok := u.Result.(*[]string)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
if datacentersRaw == nil {
|
||||
return fmt.Errorf("invalid response with a nil datacenter list")
|
||||
}
|
||||
|
||||
datacenters := *datacentersRaw
|
||||
|
||||
for _, dc := range datacenters {
|
||||
if dc == s.source.Datacenter {
|
||||
continue
|
||||
}
|
||||
|
||||
if _, ok := snap.MeshGateway.WatchedDatacenters[dc]; !ok {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
|
||||
Datacenter: dc,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceKind: structs.ServiceKindMeshGateway,
|
||||
UseServiceKind: true,
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
}, fmt.Sprintf("mesh-gateway:%s", dc), s.ch)
|
||||
|
||||
if err != nil {
|
||||
meshLogger.Error("failed to register watch for mesh-gateway",
|
||||
"datacenter", dc,
|
||||
"error", err,
|
||||
)
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
snap.MeshGateway.WatchedDatacenters[dc] = cancel
|
||||
}
|
||||
}
|
||||
|
||||
for dc, cancelFn := range snap.MeshGateway.WatchedDatacenters {
|
||||
found := false
|
||||
for _, dcCurrent := range datacenters {
|
||||
if dcCurrent == dc {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
delete(snap.MeshGateway.WatchedDatacenters, dc)
|
||||
cancelFn()
|
||||
}
|
||||
}
|
||||
case serviceResolversWatchID:
|
||||
configEntries, ok := u.Result.(*structs.IndexedConfigEntries)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
resolvers := make(map[structs.ServiceName]*structs.ServiceResolverConfigEntry)
|
||||
for _, entry := range configEntries.Entries {
|
||||
if resolver, ok := entry.(*structs.ServiceResolverConfigEntry); ok {
|
||||
resolvers[structs.NewServiceName(resolver.Name, &resolver.EnterpriseMeta)] = resolver
|
||||
}
|
||||
}
|
||||
snap.MeshGateway.ServiceResolvers = resolvers
|
||||
|
||||
case consulServerListWatchID:
|
||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
// Do some initial sanity checks to avoid doing something dumb.
|
||||
for _, csn := range resp.Nodes {
|
||||
if csn.Service.Service != structs.ConsulServiceName {
|
||||
return fmt.Errorf("expected service name %q but got %q",
|
||||
structs.ConsulServiceName, csn.Service.Service)
|
||||
}
|
||||
if csn.Node.Datacenter != snap.Datacenter {
|
||||
return fmt.Errorf("expected datacenter %q but got %q",
|
||||
snap.Datacenter, csn.Node.Datacenter)
|
||||
}
|
||||
}
|
||||
|
||||
snap.MeshGateway.ConsulServers = resp.Nodes
|
||||
|
||||
default:
|
||||
switch {
|
||||
case strings.HasPrefix(u.CorrelationID, "connect-service:"):
|
||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, "connect-service:"))
|
||||
|
||||
if len(resp.Nodes) > 0 {
|
||||
snap.MeshGateway.ServiceGroups[sn] = resp.Nodes
|
||||
} else if _, ok := snap.MeshGateway.ServiceGroups[sn]; ok {
|
||||
delete(snap.MeshGateway.ServiceGroups, sn)
|
||||
}
|
||||
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
|
||||
resp, ok := u.Result.(*structs.IndexedNodesWithGateways)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
dc := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
|
||||
delete(snap.MeshGateway.GatewayGroups, dc)
|
||||
delete(snap.MeshGateway.HostnameDatacenters, dc)
|
||||
|
||||
if len(resp.Nodes) > 0 {
|
||||
snap.MeshGateway.GatewayGroups[dc] = resp.Nodes
|
||||
snap.MeshGateway.HostnameDatacenters[dc] = hostnameEndpoints(
|
||||
s.logger.Named(logging.MeshGateway), snap.Datacenter, resp.Nodes)
|
||||
}
|
||||
default:
|
||||
// do nothing for now
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
File diff suppressed because it is too large
Load Diff
343
agent/proxycfg/terminating_gateway.go
Normal file
343
agent/proxycfg/terminating_gateway.go
Normal file
@ -0,0 +1,343 @@
|
||||
package proxycfg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
type handlerTerminatingGateway struct {
|
||||
handlerState
|
||||
}
|
||||
|
||||
// initialize sets up the initial watches needed based on the terminating-gateway registration
|
||||
func (s *handlerTerminatingGateway) initialize(ctx context.Context) (ConfigSnapshot, error) {
|
||||
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
|
||||
// Watch for root changes
|
||||
err := s.cache.Notify(ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Source: *s.source,
|
||||
}, rootsWatchID, s.ch)
|
||||
if err != nil {
|
||||
s.logger.Error("failed to register watch for root changes", "error", err)
|
||||
return snap, err
|
||||
}
|
||||
|
||||
// Watch for the terminating-gateway's linked services
|
||||
err = s.cache.Notify(ctx, cachetype.GatewayServicesName, &structs.ServiceSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceName: s.service,
|
||||
EnterpriseMeta: s.proxyID.EnterpriseMeta,
|
||||
}, gatewayServicesWatchID, s.ch)
|
||||
if err != nil {
|
||||
s.logger.Error("failed to register watch for linked services", "error", err)
|
||||
return snap, err
|
||||
}
|
||||
|
||||
snap.TerminatingGateway.WatchedServices = make(map[structs.ServiceName]context.CancelFunc)
|
||||
snap.TerminatingGateway.WatchedIntentions = make(map[structs.ServiceName]context.CancelFunc)
|
||||
snap.TerminatingGateway.Intentions = make(map[structs.ServiceName]structs.Intentions)
|
||||
snap.TerminatingGateway.WatchedLeaves = make(map[structs.ServiceName]context.CancelFunc)
|
||||
snap.TerminatingGateway.ServiceLeaves = make(map[structs.ServiceName]*structs.IssuedCert)
|
||||
snap.TerminatingGateway.WatchedConfigs = make(map[structs.ServiceName]context.CancelFunc)
|
||||
snap.TerminatingGateway.ServiceConfigs = make(map[structs.ServiceName]*structs.ServiceConfigResponse)
|
||||
snap.TerminatingGateway.WatchedResolvers = make(map[structs.ServiceName]context.CancelFunc)
|
||||
snap.TerminatingGateway.ServiceResolvers = make(map[structs.ServiceName]*structs.ServiceResolverConfigEntry)
|
||||
snap.TerminatingGateway.ServiceResolversSet = make(map[structs.ServiceName]bool)
|
||||
snap.TerminatingGateway.ServiceGroups = make(map[structs.ServiceName]structs.CheckServiceNodes)
|
||||
snap.TerminatingGateway.GatewayServices = make(map[structs.ServiceName]structs.GatewayService)
|
||||
snap.TerminatingGateway.HostnameServices = make(map[structs.ServiceName]structs.CheckServiceNodes)
|
||||
return snap, nil
|
||||
}
|
||||
|
||||
func (s *handlerTerminatingGateway) handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||
if u.Err != nil {
|
||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||
}
|
||||
logger := s.logger
|
||||
|
||||
switch {
|
||||
case u.CorrelationID == rootsWatchID:
|
||||
roots, ok := u.Result.(*structs.IndexedCARoots)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
snap.Roots = roots
|
||||
|
||||
// Update watches based on the current list of services associated with the terminating-gateway
|
||||
case u.CorrelationID == gatewayServicesWatchID:
|
||||
services, ok := u.Result.(*structs.IndexedGatewayServices)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
svcMap := make(map[structs.ServiceName]struct{})
|
||||
for _, svc := range services.Services {
|
||||
// Make sure to add every service to this map, we use it to cancel watches below.
|
||||
svcMap[svc.Service] = struct{}{}
|
||||
|
||||
// Store the gateway <-> service mapping for TLS origination
|
||||
snap.TerminatingGateway.GatewayServices[svc.Service] = *svc
|
||||
|
||||
// Watch the health endpoint to discover endpoints for the service
|
||||
if _, ok := snap.TerminatingGateway.WatchedServices[svc.Service]; !ok {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceName: svc.Service.Name,
|
||||
EnterpriseMeta: svc.Service.EnterpriseMeta,
|
||||
|
||||
// The gateway acts as the service's proxy, so we do NOT want to discover other proxies
|
||||
Connect: false,
|
||||
}, externalServiceIDPrefix+svc.Service.String(), s.ch)
|
||||
|
||||
if err != nil {
|
||||
logger.Error("failed to register watch for external-service",
|
||||
"service", svc.Service.String(),
|
||||
"error", err,
|
||||
)
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
snap.TerminatingGateway.WatchedServices[svc.Service] = cancel
|
||||
}
|
||||
|
||||
// Watch intentions with this service as their destination
|
||||
// The gateway will enforce intentions for connections to the service
|
||||
if _, ok := snap.TerminatingGateway.WatchedIntentions[svc.Service]; !ok {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.cache.Notify(ctx, cachetype.IntentionMatchName, &structs.IntentionQueryRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Match: &structs.IntentionQueryMatch{
|
||||
Type: structs.IntentionMatchDestination,
|
||||
Entries: []structs.IntentionMatchEntry{
|
||||
{
|
||||
Namespace: svc.Service.NamespaceOrDefault(),
|
||||
Name: svc.Service.Name,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, serviceIntentionsIDPrefix+svc.Service.String(), s.ch)
|
||||
|
||||
if err != nil {
|
||||
logger.Error("failed to register watch for service-intentions",
|
||||
"service", svc.Service.String(),
|
||||
"error", err,
|
||||
)
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
snap.TerminatingGateway.WatchedIntentions[svc.Service] = cancel
|
||||
}
|
||||
|
||||
// Watch leaf certificate for the service
|
||||
// This cert is used to terminate mTLS connections on the service's behalf
|
||||
if _, ok := snap.TerminatingGateway.WatchedLeaves[svc.Service]; !ok {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.cache.Notify(ctx, cachetype.ConnectCALeafName, &cachetype.ConnectCALeafRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
Token: s.token,
|
||||
Service: svc.Service.Name,
|
||||
EnterpriseMeta: svc.Service.EnterpriseMeta,
|
||||
}, serviceLeafIDPrefix+svc.Service.String(), s.ch)
|
||||
|
||||
if err != nil {
|
||||
logger.Error("failed to register watch for a service-leaf",
|
||||
"service", svc.Service.String(),
|
||||
"error", err,
|
||||
)
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
snap.TerminatingGateway.WatchedLeaves[svc.Service] = cancel
|
||||
}
|
||||
|
||||
// Watch service configs for the service.
|
||||
// These are used to determine the protocol for the target service.
|
||||
if _, ok := snap.TerminatingGateway.WatchedConfigs[svc.Service]; !ok {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.cache.Notify(ctx, cachetype.ResolvedServiceConfigName, &structs.ServiceConfigRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Name: svc.Service.Name,
|
||||
EnterpriseMeta: svc.Service.EnterpriseMeta,
|
||||
}, serviceConfigIDPrefix+svc.Service.String(), s.ch)
|
||||
|
||||
if err != nil {
|
||||
logger.Error("failed to register watch for a resolved service config",
|
||||
"service", svc.Service.String(),
|
||||
"error", err,
|
||||
)
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
snap.TerminatingGateway.WatchedConfigs[svc.Service] = cancel
|
||||
}
|
||||
|
||||
// Watch service resolvers for the service
|
||||
// These are used to create clusters and endpoints for the service subsets
|
||||
if _, ok := snap.TerminatingGateway.WatchedResolvers[svc.Service]; !ok {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.cache.Notify(ctx, cachetype.ConfigEntriesName, &structs.ConfigEntryQuery{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Kind: structs.ServiceResolver,
|
||||
Name: svc.Service.Name,
|
||||
EnterpriseMeta: svc.Service.EnterpriseMeta,
|
||||
}, serviceResolverIDPrefix+svc.Service.String(), s.ch)
|
||||
|
||||
if err != nil {
|
||||
logger.Error("failed to register watch for a service-resolver",
|
||||
"service", svc.Service.String(),
|
||||
"error", err,
|
||||
)
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
snap.TerminatingGateway.WatchedResolvers[svc.Service] = cancel
|
||||
}
|
||||
}
|
||||
|
||||
// Delete gateway service mapping for services that were not in the update
|
||||
for sn := range snap.TerminatingGateway.GatewayServices {
|
||||
if _, ok := svcMap[sn]; !ok {
|
||||
delete(snap.TerminatingGateway.GatewayServices, sn)
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up services with hostname mapping for services that were not in the update
|
||||
for sn := range snap.TerminatingGateway.HostnameServices {
|
||||
if _, ok := svcMap[sn]; !ok {
|
||||
delete(snap.TerminatingGateway.HostnameServices, sn)
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel service instance watches for services that were not in the update
|
||||
for sn, cancelFn := range snap.TerminatingGateway.WatchedServices {
|
||||
if _, ok := svcMap[sn]; !ok {
|
||||
logger.Debug("canceling watch for service", "service", sn.String())
|
||||
delete(snap.TerminatingGateway.WatchedServices, sn)
|
||||
delete(snap.TerminatingGateway.ServiceGroups, sn)
|
||||
cancelFn()
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel leaf cert watches for services that were not in the update
|
||||
for sn, cancelFn := range snap.TerminatingGateway.WatchedLeaves {
|
||||
if _, ok := svcMap[sn]; !ok {
|
||||
logger.Debug("canceling watch for leaf cert", "service", sn.String())
|
||||
delete(snap.TerminatingGateway.WatchedLeaves, sn)
|
||||
delete(snap.TerminatingGateway.ServiceLeaves, sn)
|
||||
cancelFn()
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel service config watches for services that were not in the update
|
||||
for sn, cancelFn := range snap.TerminatingGateway.WatchedConfigs {
|
||||
if _, ok := svcMap[sn]; !ok {
|
||||
logger.Debug("canceling watch for resolved service config", "service", sn.String())
|
||||
delete(snap.TerminatingGateway.WatchedConfigs, sn)
|
||||
delete(snap.TerminatingGateway.ServiceConfigs, sn)
|
||||
cancelFn()
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel service-resolver watches for services that were not in the update
|
||||
for sn, cancelFn := range snap.TerminatingGateway.WatchedResolvers {
|
||||
if _, ok := svcMap[sn]; !ok {
|
||||
logger.Debug("canceling watch for service-resolver", "service", sn.String())
|
||||
delete(snap.TerminatingGateway.WatchedResolvers, sn)
|
||||
delete(snap.TerminatingGateway.ServiceResolvers, sn)
|
||||
delete(snap.TerminatingGateway.ServiceResolversSet, sn)
|
||||
cancelFn()
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel intention watches for services that were not in the update
|
||||
for sn, cancelFn := range snap.TerminatingGateway.WatchedIntentions {
|
||||
if _, ok := svcMap[sn]; !ok {
|
||||
logger.Debug("canceling watch for intention", "service", sn.String())
|
||||
delete(snap.TerminatingGateway.WatchedIntentions, sn)
|
||||
delete(snap.TerminatingGateway.Intentions, sn)
|
||||
cancelFn()
|
||||
}
|
||||
}
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, externalServiceIDPrefix):
|
||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, externalServiceIDPrefix))
|
||||
delete(snap.TerminatingGateway.ServiceGroups, sn)
|
||||
delete(snap.TerminatingGateway.HostnameServices, sn)
|
||||
|
||||
if len(resp.Nodes) > 0 {
|
||||
snap.TerminatingGateway.ServiceGroups[sn] = resp.Nodes
|
||||
snap.TerminatingGateway.HostnameServices[sn] = hostnameEndpoints(
|
||||
s.logger, snap.Datacenter, resp.Nodes)
|
||||
}
|
||||
|
||||
// Store leaf cert for watched service
|
||||
case strings.HasPrefix(u.CorrelationID, serviceLeafIDPrefix):
|
||||
leaf, ok := u.Result.(*structs.IssuedCert)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, serviceLeafIDPrefix))
|
||||
snap.TerminatingGateway.ServiceLeaves[sn] = leaf
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, serviceConfigIDPrefix):
|
||||
serviceConfig, ok := u.Result.(*structs.ServiceConfigResponse)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, serviceConfigIDPrefix))
|
||||
snap.TerminatingGateway.ServiceConfigs[sn] = serviceConfig
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, serviceResolverIDPrefix):
|
||||
configEntries, ok := u.Result.(*structs.IndexedConfigEntries)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, serviceResolverIDPrefix))
|
||||
// There should only ever be one entry for a service resolver within a namespace
|
||||
if len(configEntries.Entries) == 1 {
|
||||
if resolver, ok := configEntries.Entries[0].(*structs.ServiceResolverConfigEntry); ok {
|
||||
snap.TerminatingGateway.ServiceResolvers[sn] = resolver
|
||||
}
|
||||
}
|
||||
snap.TerminatingGateway.ServiceResolversSet[sn] = true
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, serviceIntentionsIDPrefix):
|
||||
resp, ok := u.Result.(*structs.IndexedIntentionMatches)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
|
||||
sn := structs.ServiceNameFromString(strings.TrimPrefix(u.CorrelationID, serviceIntentionsIDPrefix))
|
||||
|
||||
if len(resp.Matches) > 0 {
|
||||
// RPC supports matching multiple services at once but we only ever
|
||||
// query with the one service we represent currently so just pick
|
||||
// the one result set up.
|
||||
snap.TerminatingGateway.Intentions[sn] = resp.Matches[0]
|
||||
}
|
||||
|
||||
default:
|
||||
// do nothing
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
395
agent/proxycfg/upstreams.go
Normal file
395
agent/proxycfg/upstreams.go
Normal file
@ -0,0 +1,395 @@
|
||||
package proxycfg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/mitchellh/mapstructure"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/connect"
|
||||
|
||||
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
type handlerUpstreams struct {
|
||||
handlerState
|
||||
}
|
||||
|
||||
func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
||||
if u.Err != nil {
|
||||
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
||||
}
|
||||
|
||||
upstreamsSnapshot := &snap.ConnectProxy.ConfigSnapshotUpstreams
|
||||
if snap.Kind == structs.ServiceKindIngressGateway {
|
||||
upstreamsSnapshot = &snap.IngressGateway.ConfigSnapshotUpstreams
|
||||
}
|
||||
|
||||
switch {
|
||||
case u.CorrelationID == leafWatchID:
|
||||
leaf, ok := u.Result.(*structs.IssuedCert)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
upstreamsSnapshot.Leaf = leaf
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, "discovery-chain:"):
|
||||
resp, ok := u.Result.(*structs.DiscoveryChainResponse)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
|
||||
upstreamsSnapshot.DiscoveryChain[svc] = resp.Chain
|
||||
|
||||
if err := s.resetWatchesFromChain(ctx, svc, resp.Chain, upstreamsSnapshot); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, "upstream-target:"):
|
||||
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:")
|
||||
targetID, svc, ok := removeColonPrefix(correlationID)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
|
||||
}
|
||||
|
||||
if _, ok := upstreamsSnapshot.WatchedUpstreamEndpoints[svc]; !ok {
|
||||
upstreamsSnapshot.WatchedUpstreamEndpoints[svc] = make(map[string]structs.CheckServiceNodes)
|
||||
}
|
||||
upstreamsSnapshot.WatchedUpstreamEndpoints[svc][targetID] = resp.Nodes
|
||||
|
||||
var passthroughAddrs map[string]ServicePassthroughAddrs
|
||||
|
||||
for _, node := range resp.Nodes {
|
||||
if snap.Proxy.Mode == structs.ProxyModeTransparent && node.Service.Proxy.TransparentProxy.DialedDirectly {
|
||||
if passthroughAddrs == nil {
|
||||
passthroughAddrs = make(map[string]ServicePassthroughAddrs)
|
||||
}
|
||||
|
||||
svc := node.Service.CompoundServiceName()
|
||||
|
||||
// Overwrite the name if it's a connect proxy (as opposed to Connect native).
|
||||
// We don't reference the proxy name directly for things like SNI, but rather the name
|
||||
// of the destination. The enterprise meta of a proxy will always be the same as that of
|
||||
// the destination service, so that remains intact.
|
||||
if node.Service.Kind == structs.ServiceKindConnectProxy {
|
||||
dst := node.Service.Proxy.DestinationServiceName
|
||||
if dst == "" {
|
||||
dst = node.Service.Proxy.DestinationServiceID
|
||||
}
|
||||
svc.Name = dst
|
||||
}
|
||||
|
||||
sni := connect.ServiceSNI(
|
||||
svc.Name,
|
||||
"",
|
||||
svc.NamespaceOrDefault(),
|
||||
snap.Datacenter,
|
||||
snap.Roots.TrustDomain)
|
||||
|
||||
if _, ok := upstreamsSnapshot.PassthroughUpstreams[svc.String()]; !ok {
|
||||
upstreamsSnapshot.PassthroughUpstreams[svc.String()] = ServicePassthroughAddrs{
|
||||
SNI: sni,
|
||||
|
||||
// Stored in a set because it's possible for these to be duplicated
|
||||
// when the upstream-target is targeted by multiple discovery chains.
|
||||
Addrs: make(map[string]struct{}),
|
||||
}
|
||||
}
|
||||
addr, _ := node.BestAddress(false)
|
||||
upstreamsSnapshot.PassthroughUpstreams[svc.String()].Addrs[addr] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
|
||||
resp, ok := u.Result.(*structs.IndexedNodesWithGateways)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid type for response: %T", u.Result)
|
||||
}
|
||||
correlationID := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
|
||||
dc, svc, ok := removeColonPrefix(correlationID)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
|
||||
}
|
||||
if _, ok = upstreamsSnapshot.WatchedGatewayEndpoints[svc]; !ok {
|
||||
upstreamsSnapshot.WatchedGatewayEndpoints[svc] = make(map[string]structs.CheckServiceNodes)
|
||||
}
|
||||
upstreamsSnapshot.WatchedGatewayEndpoints[svc][dc] = resp.Nodes
|
||||
default:
|
||||
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeColonPrefix(s string) (string, string, bool) {
|
||||
idx := strings.Index(s, ":")
|
||||
if idx == -1 {
|
||||
return "", "", false
|
||||
}
|
||||
return s[0:idx], s[idx+1:], true
|
||||
}
|
||||
|
||||
func (s *handlerUpstreams) resetWatchesFromChain(
|
||||
ctx context.Context,
|
||||
id string,
|
||||
chain *structs.CompiledDiscoveryChain,
|
||||
snap *ConfigSnapshotUpstreams,
|
||||
) error {
|
||||
s.logger.Trace("resetting watches for discovery chain", "id", id)
|
||||
if chain == nil {
|
||||
return fmt.Errorf("not possible to arrive here with no discovery chain")
|
||||
}
|
||||
|
||||
// Initialize relevant sub maps.
|
||||
if _, ok := snap.WatchedUpstreams[id]; !ok {
|
||||
snap.WatchedUpstreams[id] = make(map[string]context.CancelFunc)
|
||||
}
|
||||
if _, ok := snap.WatchedUpstreamEndpoints[id]; !ok {
|
||||
snap.WatchedUpstreamEndpoints[id] = make(map[string]structs.CheckServiceNodes)
|
||||
}
|
||||
if _, ok := snap.WatchedGateways[id]; !ok {
|
||||
snap.WatchedGateways[id] = make(map[string]context.CancelFunc)
|
||||
}
|
||||
if _, ok := snap.WatchedGatewayEndpoints[id]; !ok {
|
||||
snap.WatchedGatewayEndpoints[id] = make(map[string]structs.CheckServiceNodes)
|
||||
}
|
||||
|
||||
// We could invalidate this selectively based on a hash of the relevant
|
||||
// resolver information, but for now just reset anything about this
|
||||
// upstream when the chain changes in any way.
|
||||
//
|
||||
// TODO(rb): content hash based add/remove
|
||||
for targetID, cancelFn := range snap.WatchedUpstreams[id] {
|
||||
s.logger.Trace("stopping watch of target",
|
||||
"upstream", id,
|
||||
"chain", chain.ServiceName,
|
||||
"target", targetID,
|
||||
)
|
||||
delete(snap.WatchedUpstreams[id], targetID)
|
||||
delete(snap.WatchedUpstreamEndpoints[id], targetID)
|
||||
cancelFn()
|
||||
}
|
||||
|
||||
var (
|
||||
watchedChainEndpoints bool
|
||||
needGateways = make(map[string]struct{})
|
||||
)
|
||||
|
||||
chainID := chain.ID()
|
||||
for _, target := range chain.Targets {
|
||||
if target.ID == chainID {
|
||||
watchedChainEndpoints = true
|
||||
}
|
||||
|
||||
opts := targetWatchOpts{
|
||||
upstreamID: id,
|
||||
chainID: target.ID,
|
||||
service: target.Service,
|
||||
filter: target.Subset.Filter,
|
||||
datacenter: target.Datacenter,
|
||||
entMeta: target.GetEnterpriseMetadata(),
|
||||
}
|
||||
err := s.watchUpstreamTarget(ctx, snap, opts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to watch target %q for upstream %q", target.ID, id)
|
||||
}
|
||||
|
||||
// We'll get endpoints from the gateway query, but the health still has
|
||||
// to come from the backing service query.
|
||||
switch target.MeshGateway.Mode {
|
||||
case structs.MeshGatewayModeRemote:
|
||||
needGateways[target.Datacenter] = struct{}{}
|
||||
case structs.MeshGatewayModeLocal:
|
||||
needGateways[s.source.Datacenter] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// If the discovery chain's targets do not lead to watching all endpoints
|
||||
// for the upstream, then create a separate watch for those too.
|
||||
// This is needed in transparent mode because if there is some service A that
|
||||
// redirects to service B, the dialing proxy needs to associate A's virtual IP
|
||||
// with A's discovery chain.
|
||||
//
|
||||
// Outside of transparent mode we only watch the chain target, B,
|
||||
// since A is a virtual service and traffic will not be sent to it.
|
||||
if !watchedChainEndpoints && s.proxyCfg.Mode == structs.ProxyModeTransparent {
|
||||
chainEntMeta := structs.NewEnterpriseMeta(chain.Namespace)
|
||||
|
||||
opts := targetWatchOpts{
|
||||
upstreamID: id,
|
||||
chainID: chainID,
|
||||
service: chain.ServiceName,
|
||||
filter: "",
|
||||
datacenter: chain.Datacenter,
|
||||
entMeta: &chainEntMeta,
|
||||
}
|
||||
err := s.watchUpstreamTarget(ctx, snap, opts)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to watch target %q for upstream %q", chainID, id)
|
||||
}
|
||||
}
|
||||
|
||||
for dc := range needGateways {
|
||||
if _, ok := snap.WatchedGateways[id][dc]; ok {
|
||||
continue
|
||||
}
|
||||
|
||||
s.logger.Trace("initializing watch of mesh gateway in datacenter",
|
||||
"upstream", id,
|
||||
"chain", chain.ServiceName,
|
||||
"datacenter", dc,
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.watchMeshGateway(ctx, dc, id)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
snap.WatchedGateways[id][dc] = cancel
|
||||
}
|
||||
|
||||
for dc, cancelFn := range snap.WatchedGateways[id] {
|
||||
if _, ok := needGateways[dc]; ok {
|
||||
continue
|
||||
}
|
||||
s.logger.Trace("stopping watch of mesh gateway in datacenter",
|
||||
"upstream", id,
|
||||
"chain", chain.ServiceName,
|
||||
"datacenter", dc,
|
||||
)
|
||||
delete(snap.WatchedGateways[id], dc)
|
||||
delete(snap.WatchedGatewayEndpoints[id], dc)
|
||||
cancelFn()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type targetWatchOpts struct {
|
||||
upstreamID string
|
||||
chainID string
|
||||
service string
|
||||
filter string
|
||||
datacenter string
|
||||
entMeta *structs.EnterpriseMeta
|
||||
}
|
||||
|
||||
func (s *handlerUpstreams) watchMeshGateway(ctx context.Context, dc string, upstreamID string) error {
|
||||
return s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
|
||||
Datacenter: dc,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
ServiceKind: structs.ServiceKindMeshGateway,
|
||||
UseServiceKind: true,
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: *structs.DefaultEnterpriseMeta(),
|
||||
}, "mesh-gateway:"+dc+":"+upstreamID, s.ch)
|
||||
}
|
||||
|
||||
func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *ConfigSnapshotUpstreams, opts targetWatchOpts) error {
|
||||
s.logger.Trace("initializing watch of target",
|
||||
"upstream", opts.upstreamID,
|
||||
"chain", opts.service,
|
||||
"target", opts.chainID,
|
||||
)
|
||||
|
||||
var finalMeta structs.EnterpriseMeta
|
||||
finalMeta.Merge(opts.entMeta)
|
||||
|
||||
correlationID := "upstream-target:" + opts.chainID + ":" + opts.upstreamID
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
||||
Datacenter: opts.datacenter,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Token: s.token,
|
||||
Filter: opts.filter,
|
||||
},
|
||||
ServiceName: opts.service,
|
||||
Connect: true,
|
||||
// Note that Identifier doesn't type-prefix for service any more as it's
|
||||
// the default and makes metrics and other things much cleaner. It's
|
||||
// simpler for us if we have the type to make things unambiguous.
|
||||
Source: *s.source,
|
||||
EnterpriseMeta: finalMeta,
|
||||
}, correlationID, s.ch)
|
||||
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
snap.WatchedUpstreams[opts.upstreamID][opts.chainID] = cancel
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type discoveryChainWatchOpts struct {
|
||||
id string
|
||||
name string
|
||||
namespace string
|
||||
datacenter string
|
||||
cfg reducedUpstreamConfig
|
||||
meshGateway structs.MeshGatewayConfig
|
||||
}
|
||||
|
||||
func (s *handlerUpstreams) watchDiscoveryChain(ctx context.Context, snap *ConfigSnapshot, opts discoveryChainWatchOpts) error {
|
||||
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[opts.id]; ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
|
||||
Datacenter: s.source.Datacenter,
|
||||
QueryOptions: structs.QueryOptions{Token: s.token},
|
||||
Name: opts.name,
|
||||
EvaluateInDatacenter: opts.datacenter,
|
||||
EvaluateInNamespace: opts.namespace,
|
||||
OverrideProtocol: opts.cfg.Protocol,
|
||||
OverrideConnectTimeout: opts.cfg.ConnectTimeout(),
|
||||
OverrideMeshGateway: opts.meshGateway,
|
||||
}, "discovery-chain:"+opts.id, s.ch)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
switch s.kind {
|
||||
case structs.ServiceKindIngressGateway:
|
||||
snap.IngressGateway.WatchedDiscoveryChains[opts.id] = cancel
|
||||
case structs.ServiceKindConnectProxy:
|
||||
snap.ConnectProxy.WatchedDiscoveryChains[opts.id] = cancel
|
||||
default:
|
||||
cancel()
|
||||
return fmt.Errorf("unsupported kind %s", s.kind)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// reducedUpstreamConfig represents the basic opaque config values that are now
|
||||
// managed with the discovery chain but for backwards compatibility reasons
|
||||
// should still affect how the proxy is configured.
|
||||
//
|
||||
// The full-blown config is agent/xds.UpstreamConfig
|
||||
type reducedUpstreamConfig struct {
|
||||
Protocol string `mapstructure:"protocol"`
|
||||
ConnectTimeoutMs int `mapstructure:"connect_timeout_ms"`
|
||||
}
|
||||
|
||||
func (c *reducedUpstreamConfig) ConnectTimeout() time.Duration {
|
||||
return time.Duration(c.ConnectTimeoutMs) * time.Millisecond
|
||||
}
|
||||
|
||||
func parseReducedUpstreamConfig(m map[string]interface{}) (reducedUpstreamConfig, error) {
|
||||
var cfg reducedUpstreamConfig
|
||||
err := mapstructure.WeakDecode(m, &cfg)
|
||||
return cfg, err
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user