mirror of
https://github.com/status-im/consul.git
synced 2025-01-19 18:19:53 +00:00
444 lines
14 KiB
Go
444 lines
14 KiB
Go
package proxycfg
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/mitchellh/mapstructure"
|
|
|
|
"github.com/hashicorp/consul/agent/cache"
|
|
"github.com/hashicorp/consul/agent/connect"
|
|
|
|
cachetype "github.com/hashicorp/consul/agent/cache-types"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
)
|
|
|
|
type handlerUpstreams struct {
|
|
handlerState
|
|
}
|
|
|
|
func (s *handlerUpstreams) handleUpdateUpstreams(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error {
|
|
if u.Err != nil {
|
|
return fmt.Errorf("error filling agent cache: %v", u.Err)
|
|
}
|
|
|
|
upstreamsSnapshot := &snap.ConnectProxy.ConfigSnapshotUpstreams
|
|
if snap.Kind == structs.ServiceKindIngressGateway {
|
|
upstreamsSnapshot = &snap.IngressGateway.ConfigSnapshotUpstreams
|
|
}
|
|
|
|
switch {
|
|
case u.CorrelationID == leafWatchID:
|
|
leaf, ok := u.Result.(*structs.IssuedCert)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
upstreamsSnapshot.Leaf = leaf
|
|
|
|
case strings.HasPrefix(u.CorrelationID, "discovery-chain:"):
|
|
resp, ok := u.Result.(*structs.DiscoveryChainResponse)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
svc := strings.TrimPrefix(u.CorrelationID, "discovery-chain:")
|
|
|
|
switch snap.Kind {
|
|
case structs.ServiceKindIngressGateway:
|
|
if _, ok := snap.IngressGateway.UpstreamsSet[svc]; !ok {
|
|
// Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped.
|
|
// The associated watch was likely cancelled.
|
|
delete(upstreamsSnapshot.DiscoveryChain, svc)
|
|
s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", svc)
|
|
return nil
|
|
}
|
|
|
|
case structs.ServiceKindConnectProxy:
|
|
explicit := snap.ConnectProxy.UpstreamConfig[svc].HasLocalPortOrSocket()
|
|
if _, implicit := snap.ConnectProxy.IntentionUpstreams[svc]; !implicit && !explicit {
|
|
// Discovery chain is not associated with a known explicit or implicit upstream so it is purged/skipped.
|
|
// The associated watch was likely cancelled.
|
|
delete(upstreamsSnapshot.DiscoveryChain, svc)
|
|
s.logger.Trace("discovery-chain watch fired for unknown upstream", "upstream", svc)
|
|
return nil
|
|
}
|
|
default:
|
|
return fmt.Errorf("discovery-chain watch fired for unsupported kind: %s", snap.Kind)
|
|
}
|
|
|
|
upstreamsSnapshot.DiscoveryChain[svc] = resp.Chain
|
|
|
|
if err := s.resetWatchesFromChain(ctx, svc, resp.Chain, upstreamsSnapshot); err != nil {
|
|
return err
|
|
}
|
|
|
|
case strings.HasPrefix(u.CorrelationID, "upstream-target:"):
|
|
resp, ok := u.Result.(*structs.IndexedCheckServiceNodes)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
correlationID := strings.TrimPrefix(u.CorrelationID, "upstream-target:")
|
|
targetID, svc, ok := removeColonPrefix(correlationID)
|
|
if !ok {
|
|
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
|
|
}
|
|
|
|
if _, ok := upstreamsSnapshot.WatchedUpstreamEndpoints[svc]; !ok {
|
|
upstreamsSnapshot.WatchedUpstreamEndpoints[svc] = make(map[string]structs.CheckServiceNodes)
|
|
}
|
|
upstreamsSnapshot.WatchedUpstreamEndpoints[svc][targetID] = resp.Nodes
|
|
|
|
var passthroughAddrs map[string]ServicePassthroughAddrs
|
|
|
|
for _, node := range resp.Nodes {
|
|
if snap.Proxy.Mode == structs.ProxyModeTransparent && node.Service.Proxy.TransparentProxy.DialedDirectly {
|
|
if passthroughAddrs == nil {
|
|
passthroughAddrs = make(map[string]ServicePassthroughAddrs)
|
|
}
|
|
|
|
svc := node.Service.CompoundServiceName()
|
|
|
|
// Overwrite the name if it's a connect proxy (as opposed to Connect native).
|
|
// We don't reference the proxy name directly for things like SNI, but rather the name
|
|
// of the destination. The enterprise meta of a proxy will always be the same as that of
|
|
// the destination service, so that remains intact.
|
|
if node.Service.Kind == structs.ServiceKindConnectProxy {
|
|
dst := node.Service.Proxy.DestinationServiceName
|
|
if dst == "" {
|
|
dst = node.Service.Proxy.DestinationServiceID
|
|
}
|
|
svc.Name = dst
|
|
}
|
|
|
|
sni := connect.ServiceSNI(svc.Name, "", svc.NamespaceOrDefault(), svc.PartitionOrDefault(), snap.Datacenter, snap.Roots.TrustDomain)
|
|
|
|
spiffeID := connect.SpiffeIDService{
|
|
Host: snap.Roots.TrustDomain,
|
|
Partition: svc.PartitionOrDefault(),
|
|
Namespace: svc.NamespaceOrDefault(),
|
|
Datacenter: snap.Datacenter,
|
|
Service: svc.Name,
|
|
}
|
|
|
|
if _, ok := upstreamsSnapshot.PassthroughUpstreams[svc.String()]; !ok {
|
|
upstreamsSnapshot.PassthroughUpstreams[svc.String()] = ServicePassthroughAddrs{
|
|
SNI: sni,
|
|
SpiffeID: spiffeID,
|
|
|
|
// Stored in a set because it's possible for these to be duplicated
|
|
// when the upstream-target is targeted by multiple discovery chains.
|
|
Addrs: make(map[string]struct{}),
|
|
}
|
|
}
|
|
|
|
// Make sure to use an external address when crossing partitions.
|
|
isRemote := !structs.EqualPartitions(svc.PartitionOrDefault(), s.proxyID.PartitionOrDefault())
|
|
addr, _ := node.BestAddress(isRemote)
|
|
|
|
upstreamsSnapshot.PassthroughUpstreams[svc.String()].Addrs[addr] = struct{}{}
|
|
}
|
|
}
|
|
|
|
case strings.HasPrefix(u.CorrelationID, "mesh-gateway:"):
|
|
resp, ok := u.Result.(*structs.IndexedNodesWithGateways)
|
|
if !ok {
|
|
return fmt.Errorf("invalid type for response: %T", u.Result)
|
|
}
|
|
correlationID := strings.TrimPrefix(u.CorrelationID, "mesh-gateway:")
|
|
key, svc, ok := removeColonPrefix(correlationID)
|
|
if !ok {
|
|
return fmt.Errorf("invalid correlation id %q", u.CorrelationID)
|
|
}
|
|
if _, ok = upstreamsSnapshot.WatchedGatewayEndpoints[svc]; !ok {
|
|
upstreamsSnapshot.WatchedGatewayEndpoints[svc] = make(map[string]structs.CheckServiceNodes)
|
|
}
|
|
upstreamsSnapshot.WatchedGatewayEndpoints[svc][key] = resp.Nodes
|
|
|
|
default:
|
|
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func removeColonPrefix(s string) (string, string, bool) {
|
|
idx := strings.Index(s, ":")
|
|
if idx == -1 {
|
|
return "", "", false
|
|
}
|
|
return s[0:idx], s[idx+1:], true
|
|
}
|
|
|
|
func (s *handlerUpstreams) resetWatchesFromChain(
|
|
ctx context.Context,
|
|
id string,
|
|
chain *structs.CompiledDiscoveryChain,
|
|
snap *ConfigSnapshotUpstreams,
|
|
) error {
|
|
s.logger.Trace("resetting watches for discovery chain", "id", id)
|
|
if chain == nil {
|
|
return fmt.Errorf("not possible to arrive here with no discovery chain")
|
|
}
|
|
|
|
// Initialize relevant sub maps.
|
|
if _, ok := snap.WatchedUpstreams[id]; !ok {
|
|
snap.WatchedUpstreams[id] = make(map[string]context.CancelFunc)
|
|
}
|
|
if _, ok := snap.WatchedUpstreamEndpoints[id]; !ok {
|
|
snap.WatchedUpstreamEndpoints[id] = make(map[string]structs.CheckServiceNodes)
|
|
}
|
|
if _, ok := snap.WatchedGateways[id]; !ok {
|
|
snap.WatchedGateways[id] = make(map[string]context.CancelFunc)
|
|
}
|
|
if _, ok := snap.WatchedGatewayEndpoints[id]; !ok {
|
|
snap.WatchedGatewayEndpoints[id] = make(map[string]structs.CheckServiceNodes)
|
|
}
|
|
|
|
// We could invalidate this selectively based on a hash of the relevant
|
|
// resolver information, but for now just reset anything about this
|
|
// upstream when the chain changes in any way.
|
|
//
|
|
// TODO(rb): content hash based add/remove
|
|
for targetID, cancelFn := range snap.WatchedUpstreams[id] {
|
|
s.logger.Trace("stopping watch of target",
|
|
"upstream", id,
|
|
"chain", chain.ServiceName,
|
|
"target", targetID,
|
|
)
|
|
delete(snap.WatchedUpstreams[id], targetID)
|
|
delete(snap.WatchedUpstreamEndpoints[id], targetID)
|
|
cancelFn()
|
|
}
|
|
|
|
var (
|
|
watchedChainEndpoints bool
|
|
needGateways = make(map[string]struct{})
|
|
)
|
|
|
|
chainID := chain.ID()
|
|
for _, target := range chain.Targets {
|
|
if target.ID == chainID {
|
|
watchedChainEndpoints = true
|
|
}
|
|
|
|
opts := targetWatchOpts{
|
|
upstreamID: id,
|
|
chainID: target.ID,
|
|
service: target.Service,
|
|
filter: target.Subset.Filter,
|
|
datacenter: target.Datacenter,
|
|
entMeta: target.GetEnterpriseMetadata(),
|
|
}
|
|
err := s.watchUpstreamTarget(ctx, snap, opts)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to watch target %q for upstream %q", target.ID, id)
|
|
}
|
|
|
|
// We'll get endpoints from the gateway query, but the health still has
|
|
// to come from the backing service query.
|
|
var gk GatewayKey
|
|
|
|
switch target.MeshGateway.Mode {
|
|
case structs.MeshGatewayModeRemote:
|
|
gk = GatewayKey{
|
|
Partition: target.Partition,
|
|
Datacenter: target.Datacenter,
|
|
}
|
|
case structs.MeshGatewayModeLocal:
|
|
gk = GatewayKey{
|
|
Partition: s.source.NodePartitionOrDefault(),
|
|
Datacenter: s.source.Datacenter,
|
|
}
|
|
}
|
|
if s.source.Datacenter != target.Datacenter || s.proxyID.PartitionOrDefault() != target.Partition {
|
|
needGateways[gk.String()] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// If the discovery chain's targets do not lead to watching all endpoints
|
|
// for the upstream, then create a separate watch for those too.
|
|
// This is needed in transparent mode because if there is some service A that
|
|
// redirects to service B, the dialing proxy needs to associate A's virtual IP
|
|
// with A's discovery chain.
|
|
//
|
|
// Outside of transparent mode we only watch the chain target, B,
|
|
// since A is a virtual service and traffic will not be sent to it.
|
|
if !watchedChainEndpoints && s.proxyCfg.Mode == structs.ProxyModeTransparent {
|
|
chainEntMeta := structs.NewEnterpriseMetaWithPartition(chain.Partition, chain.Namespace)
|
|
|
|
opts := targetWatchOpts{
|
|
upstreamID: id,
|
|
chainID: chainID,
|
|
service: chain.ServiceName,
|
|
filter: "",
|
|
datacenter: chain.Datacenter,
|
|
entMeta: &chainEntMeta,
|
|
}
|
|
err := s.watchUpstreamTarget(ctx, snap, opts)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to watch target %q for upstream %q", chainID, id)
|
|
}
|
|
}
|
|
|
|
for key := range needGateways {
|
|
if _, ok := snap.WatchedGateways[id][key]; ok {
|
|
continue
|
|
}
|
|
gwKey := gatewayKeyFromString(key)
|
|
|
|
s.logger.Trace("initializing watch of mesh gateway",
|
|
"upstream", id,
|
|
"chain", chain.ServiceName,
|
|
"datacenter", gwKey.Datacenter,
|
|
"partition", gwKey.Partition,
|
|
)
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
opts := gatewayWatchOpts{
|
|
notifier: s.cache,
|
|
notifyCh: s.ch,
|
|
source: *s.source,
|
|
token: s.token,
|
|
key: gwKey,
|
|
upstreamID: id,
|
|
}
|
|
err := watchMeshGateway(ctx, opts)
|
|
if err != nil {
|
|
cancel()
|
|
return err
|
|
}
|
|
|
|
snap.WatchedGateways[id][key] = cancel
|
|
}
|
|
|
|
for key, cancelFn := range snap.WatchedGateways[id] {
|
|
if _, ok := needGateways[key]; ok {
|
|
continue
|
|
}
|
|
gwKey := gatewayKeyFromString(key)
|
|
|
|
s.logger.Trace("stopping watch of mesh gateway",
|
|
"upstream", id,
|
|
"chain", chain.ServiceName,
|
|
"datacenter", gwKey.Datacenter,
|
|
"partition", gwKey.Partition,
|
|
)
|
|
delete(snap.WatchedGateways[id], key)
|
|
delete(snap.WatchedGatewayEndpoints[id], key)
|
|
cancelFn()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
type targetWatchOpts struct {
|
|
upstreamID string
|
|
chainID string
|
|
service string
|
|
filter string
|
|
datacenter string
|
|
entMeta *structs.EnterpriseMeta
|
|
}
|
|
|
|
func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *ConfigSnapshotUpstreams, opts targetWatchOpts) error {
|
|
s.logger.Trace("initializing watch of target",
|
|
"upstream", opts.upstreamID,
|
|
"chain", opts.service,
|
|
"target", opts.chainID,
|
|
)
|
|
|
|
var finalMeta structs.EnterpriseMeta
|
|
finalMeta.Merge(opts.entMeta)
|
|
|
|
correlationID := "upstream-target:" + opts.chainID + ":" + opts.upstreamID
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
err := s.health.Notify(ctx, structs.ServiceSpecificRequest{
|
|
Datacenter: opts.datacenter,
|
|
QueryOptions: structs.QueryOptions{
|
|
Token: s.token,
|
|
Filter: opts.filter,
|
|
},
|
|
ServiceName: opts.service,
|
|
Connect: true,
|
|
// Note that Identifier doesn't type-prefix for service any more as it's
|
|
// the default and makes metrics and other things much cleaner. It's
|
|
// simpler for us if we have the type to make things unambiguous.
|
|
Source: *s.source,
|
|
EnterpriseMeta: finalMeta,
|
|
}, correlationID, s.ch)
|
|
|
|
if err != nil {
|
|
cancel()
|
|
return err
|
|
}
|
|
snap.WatchedUpstreams[opts.upstreamID][opts.chainID] = cancel
|
|
|
|
return nil
|
|
}
|
|
|
|
type discoveryChainWatchOpts struct {
|
|
id string
|
|
name string
|
|
namespace string
|
|
partition string
|
|
datacenter string
|
|
cfg reducedUpstreamConfig
|
|
meshGateway structs.MeshGatewayConfig
|
|
}
|
|
|
|
func (s *handlerUpstreams) watchDiscoveryChain(ctx context.Context, snap *ConfigSnapshot, opts discoveryChainWatchOpts) error {
|
|
if _, ok := snap.ConnectProxy.WatchedDiscoveryChains[opts.id]; ok {
|
|
return nil
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
err := s.cache.Notify(ctx, cachetype.CompiledDiscoveryChainName, &structs.DiscoveryChainRequest{
|
|
Datacenter: s.source.Datacenter,
|
|
QueryOptions: structs.QueryOptions{Token: s.token},
|
|
Name: opts.name,
|
|
EvaluateInDatacenter: opts.datacenter,
|
|
EvaluateInNamespace: opts.namespace,
|
|
EvaluateInPartition: opts.partition,
|
|
OverrideProtocol: opts.cfg.Protocol,
|
|
OverrideConnectTimeout: opts.cfg.ConnectTimeout(),
|
|
OverrideMeshGateway: opts.meshGateway,
|
|
}, "discovery-chain:"+opts.id, s.ch)
|
|
if err != nil {
|
|
cancel()
|
|
return err
|
|
}
|
|
|
|
switch s.kind {
|
|
case structs.ServiceKindIngressGateway:
|
|
snap.IngressGateway.WatchedDiscoveryChains[opts.id] = cancel
|
|
case structs.ServiceKindConnectProxy:
|
|
snap.ConnectProxy.WatchedDiscoveryChains[opts.id] = cancel
|
|
default:
|
|
cancel()
|
|
return fmt.Errorf("unsupported kind %s", s.kind)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// reducedUpstreamConfig represents the basic opaque config values that are now
|
|
// managed with the discovery chain but for backwards compatibility reasons
|
|
// should still affect how the proxy is configured.
|
|
//
|
|
// The full-blown config is agent/xds.UpstreamConfig
|
|
type reducedUpstreamConfig struct {
|
|
Protocol string `mapstructure:"protocol"`
|
|
ConnectTimeoutMs int `mapstructure:"connect_timeout_ms"`
|
|
}
|
|
|
|
func (c *reducedUpstreamConfig) ConnectTimeout() time.Duration {
|
|
return time.Duration(c.ConnectTimeoutMs) * time.Millisecond
|
|
}
|
|
|
|
func parseReducedUpstreamConfig(m map[string]interface{}) (reducedUpstreamConfig, error) {
|
|
var cfg reducedUpstreamConfig
|
|
err := mapstructure.WeakDecode(m, &cfg)
|
|
return cfg, err
|
|
}
|