mirror of
https://github.com/status-im/consul.git
synced 2025-01-22 11:40:06 +00:00
34a32d4ce5
The peer name will eventually show up elsewhere in the resource. For now though this rips it out of where we don’t want it to be.
1260 lines
42 KiB
Go
1260 lines
42 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package proxystateconverter
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
"github.com/hashicorp/go-uuid"
|
|
|
|
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
|
|
"google.golang.org/protobuf/types/known/durationpb"
|
|
"google.golang.org/protobuf/types/known/wrapperspb"
|
|
|
|
"github.com/hashicorp/consul/agent/connect"
|
|
"github.com/hashicorp/consul/agent/proxycfg"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/agent/xds/config"
|
|
"github.com/hashicorp/consul/agent/xds/naming"
|
|
"github.com/hashicorp/consul/agent/xds/response"
|
|
"github.com/hashicorp/consul/envoyextensions/xdscommon"
|
|
"github.com/hashicorp/consul/internal/resource"
|
|
"github.com/hashicorp/consul/proto-public/pbmesh/v2beta1/pbproxystate"
|
|
"github.com/hashicorp/consul/proto/private/pbpeering"
|
|
)
|
|
|
|
const (
|
|
meshGatewayExportedClusterNamePrefix = "exported~"
|
|
)
|
|
|
|
type namedCluster struct {
|
|
name string
|
|
cluster *pbproxystate.Cluster
|
|
}
|
|
|
|
// clustersFromSnapshot returns the xDS API representation of the "clusters" in the snapshot.
|
|
func (s *Converter) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot) error {
|
|
if cfgSnap == nil {
|
|
return errors.New("nil config given")
|
|
}
|
|
|
|
switch cfgSnap.Kind {
|
|
case structs.ServiceKindConnectProxy:
|
|
return s.clustersFromSnapshotConnectProxy(cfgSnap)
|
|
// TODO(proxystate): Terminating Gateways will be added in the future.
|
|
//case structs.ServiceKindTerminatingGateway:
|
|
// err := s.clustersFromSnapshotTerminatingGateway(cfgSnap)
|
|
// if err != nil {
|
|
// return err
|
|
// }
|
|
// return nil
|
|
// TODO(proxystate): Mesh Gateways will be added in the future.
|
|
//case structs.ServiceKindMeshGateway:
|
|
// err := s.clustersFromSnapshotMeshGateway(cfgSnap)
|
|
// if err != nil {
|
|
// return err
|
|
// }
|
|
// return nil
|
|
// TODO(proxystate): Ingress Gateways will be added in the future.
|
|
//case structs.ServiceKindIngressGateway:
|
|
// err := s.clustersFromSnapshotIngressGateway(cfgSnap)
|
|
// if err != nil {
|
|
// return err
|
|
// }
|
|
// return nil
|
|
// TODO(proxystate): API Gateways will be added in the future.
|
|
//case structs.ServiceKindAPIGateway:
|
|
// res, err := s.clustersFromSnapshotAPIGateway(cfgSnap)
|
|
// if err != nil {
|
|
// return err
|
|
// }
|
|
// return nil
|
|
default:
|
|
return fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
|
|
}
|
|
}
|
|
|
|
// clustersFromSnapshot returns the xDS API representation of the "clusters"
|
|
// (upstreams) in the snapshot.
|
|
func (s *Converter) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot) error {
|
|
// This is the list of listeners we add to. It will be empty to start.
|
|
clusters := s.proxyState.Clusters
|
|
var err error
|
|
|
|
// Include the "app" cluster for the public listener
|
|
appCluster, err := s.makeAppCluster(cfgSnap, xdscommon.LocalAppClusterName, "", cfgSnap.Proxy.LocalServicePort)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
clusters[appCluster.name] = appCluster.cluster
|
|
|
|
if cfgSnap.Proxy.Mode == structs.ProxyModeTransparent {
|
|
passthroughs, err := s.makePassthroughClusters(cfgSnap)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to make passthrough clusters for transparent proxy: %v", err)
|
|
}
|
|
for clusterName, cluster := range passthroughs {
|
|
clusters[clusterName] = cluster
|
|
}
|
|
}
|
|
|
|
// NOTE: Any time we skip a chain below we MUST also skip that discovery chain in endpoints.go
|
|
// so that the sets of endpoints generated matches the sets of clusters.
|
|
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
|
|
upstream, skip := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta)
|
|
if skip {
|
|
continue
|
|
}
|
|
|
|
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(
|
|
uid,
|
|
upstream,
|
|
chain,
|
|
cfgSnap,
|
|
false,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for name, cluster := range upstreamClusters {
|
|
clusters[name] = cluster
|
|
}
|
|
}
|
|
|
|
// TODO(proxystate): peering will be added in the future.
|
|
//// NOTE: Any time we skip an upstream below we MUST also skip that same
|
|
//// upstream in endpoints.go so that the sets of endpoints generated matches
|
|
//// the sets of clusters.
|
|
//for _, uid := range cfgSnap.ConnectProxy.PeeredUpstreamIDs() {
|
|
// upstream, skip := cfgSnap.ConnectProxy.GetUpstream(uid, &cfgSnap.ProxyID.EnterpriseMeta)
|
|
// if skip {
|
|
// continue
|
|
// }
|
|
//
|
|
// peerMeta, found := cfgSnap.ConnectProxy.UpstreamPeerMeta(uid)
|
|
// if !found {
|
|
// s.Logger.Warn("failed to fetch upstream peering metadata for cluster", "uid", uid)
|
|
// }
|
|
// cfg := s.getAndModifyUpstreamConfigForPeeredListener(uid, upstream, peerMeta)
|
|
//
|
|
// upstreamCluster, err := s.makeUpstreamClusterForPeerService(uid, cfg, peerMeta, cfgSnap)
|
|
// if err != nil {
|
|
// return nil, err
|
|
// }
|
|
// clusters = append(clusters, upstreamCluster)
|
|
//}
|
|
|
|
// TODO(proxystate): L7 Intentions and JWT Auth will be added in the future.
|
|
//// add clusters for jwt-providers
|
|
//for _, prov := range cfgSnap.JWTProviders {
|
|
// //skip cluster creation for local providers
|
|
// if prov.JSONWebKeySet == nil || prov.JSONWebKeySet.Remote == nil {
|
|
// continue
|
|
// }
|
|
//
|
|
// cluster, err := makeJWTProviderCluster(prov)
|
|
// if err != nil {
|
|
// s.Logger.Warn("failed to make jwt-provider cluster", "provider name", prov.Name, "error", err)
|
|
// continue
|
|
// }
|
|
//
|
|
// clusters[cluster.GetName()] = cluster
|
|
//}
|
|
|
|
for _, u := range cfgSnap.Proxy.Upstreams {
|
|
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
|
|
continue
|
|
}
|
|
|
|
upstreamCluster, err := s.makeUpstreamClusterForPreparedQuery(u, cfgSnap)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
clusters[upstreamCluster.name] = upstreamCluster.cluster
|
|
}
|
|
|
|
cfgSnap.Proxy.Expose.Finalize()
|
|
paths := cfgSnap.Proxy.Expose.Paths
|
|
|
|
// Add service health checks to the list of paths to create clusters for if needed
|
|
if cfgSnap.Proxy.Expose.Checks {
|
|
psid := structs.NewServiceID(cfgSnap.Proxy.DestinationServiceID, &cfgSnap.ProxyID.EnterpriseMeta)
|
|
for _, check := range cfgSnap.ConnectProxy.WatchedServiceChecks[psid] {
|
|
p, err := parseCheckPath(check)
|
|
if err != nil {
|
|
s.Logger.Warn("failed to create cluster for", "check", check.CheckID, "error", err)
|
|
continue
|
|
}
|
|
paths = append(paths, p)
|
|
}
|
|
}
|
|
|
|
// Create a new cluster if we need to expose a port that is different from the service port
|
|
for _, path := range paths {
|
|
if path.LocalPathPort == cfgSnap.Proxy.LocalServicePort {
|
|
continue
|
|
}
|
|
c, err := s.makeAppCluster(cfgSnap, makeExposeClusterName(path.LocalPathPort), path.Protocol, path.LocalPathPort)
|
|
if err != nil {
|
|
s.Logger.Warn("failed to make local cluster", "path", path.Path, "error", err)
|
|
continue
|
|
}
|
|
clusters[c.name] = c.cluster
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// TODO(proxystate): L7 Intentions and JWT Auth will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func makeJWTProviderCluster
|
|
// func makeJWKSDiscoveryClusterType
|
|
// func makeJWTCertValidationContext
|
|
// func parseJWTRemoteURL
|
|
|
|
func makeExposeClusterName(destinationPort int) string {
|
|
return fmt.Sprintf("exposed_cluster_%d", destinationPort)
|
|
}
|
|
|
|
// In transparent proxy mode there are potentially multiple passthrough clusters added.
|
|
// The first is for destinations outside of Consul's catalog. This is for a plain TCP proxy.
|
|
// All of these use Envoy's ORIGINAL_DST listener filter, which forwards to the original
|
|
// destination address (before the iptables redirection).
|
|
// The rest are for destinations inside the mesh, which require certificates for mTLS.
|
|
func (s *Converter) makePassthroughClusters(cfgSnap *proxycfg.ConfigSnapshot) (map[string]*pbproxystate.Cluster, error) {
|
|
// This size is an upper bound.
|
|
clusters := make(map[string]*pbproxystate.Cluster, 0)
|
|
if meshConf := cfgSnap.MeshConfig(); meshConf == nil ||
|
|
!meshConf.TransparentProxy.MeshDestinationsOnly {
|
|
|
|
clusters[naming.OriginalDestinationClusterName] = &pbproxystate.Cluster{
|
|
Group: &pbproxystate.Cluster_EndpointGroup{
|
|
EndpointGroup: &pbproxystate.EndpointGroup{
|
|
Group: &pbproxystate.EndpointGroup_Passthrough{
|
|
Passthrough: &pbproxystate.PassthroughEndpointGroup{
|
|
Config: &pbproxystate.PassthroughEndpointGroupConfig{
|
|
ConnectTimeout: durationpb.New(5 * time.Second),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
for uid, chain := range cfgSnap.ConnectProxy.DiscoveryChain {
|
|
targetMap, ok := cfgSnap.ConnectProxy.PassthroughUpstreams[uid]
|
|
if !ok {
|
|
continue
|
|
}
|
|
|
|
for targetID := range targetMap {
|
|
uid := proxycfg.NewUpstreamIDFromTargetID(targetID)
|
|
|
|
sni := connect.ServiceSNI(
|
|
uid.Name, "", uid.NamespaceOrDefault(),
|
|
uid.PartitionOrDefault(), cfgSnap.Datacenter,
|
|
cfgSnap.Roots.TrustDomain)
|
|
|
|
// Prefixed with passthrough to distinguish from non-passthrough clusters for the same upstream.
|
|
name := "passthrough~" + sni
|
|
|
|
c := pbproxystate.Cluster{
|
|
Group: &pbproxystate.Cluster_EndpointGroup{
|
|
EndpointGroup: &pbproxystate.EndpointGroup{
|
|
Group: &pbproxystate.EndpointGroup_Passthrough{
|
|
Passthrough: &pbproxystate.PassthroughEndpointGroup{
|
|
Config: &pbproxystate.PassthroughEndpointGroupConfig{
|
|
ConnectTimeout: durationpb.New(5 * time.Second),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
if discoTarget, ok := chain.Targets[targetID]; ok && discoTarget.ConnectTimeout > 0 {
|
|
c.GetEndpointGroup().GetPassthrough().GetConfig().
|
|
ConnectTimeout = durationpb.New(discoTarget.ConnectTimeout)
|
|
}
|
|
|
|
transportSocket, err := s.createOutboundMeshMTLS(cfgSnap, []string{getSpiffeID(cfgSnap, uid)}, sni)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.GetEndpointGroup().GetPassthrough().OutboundTls = transportSocket
|
|
|
|
clusters[name] = &c
|
|
}
|
|
}
|
|
|
|
err := cfgSnap.ConnectProxy.DestinationsUpstream.ForEachKeyE(func(uid proxycfg.UpstreamID) error {
|
|
svcConfig, ok := cfgSnap.ConnectProxy.DestinationsUpstream.Get(uid)
|
|
if !ok || svcConfig.Destination == nil {
|
|
return nil
|
|
}
|
|
|
|
// One Cluster per Destination Address
|
|
for _, address := range svcConfig.Destination.Addresses {
|
|
name := clusterNameForDestination(cfgSnap, uid.Name, address, uid.NamespaceOrDefault(), uid.PartitionOrDefault())
|
|
|
|
c := &pbproxystate.Cluster{
|
|
AltStatName: name,
|
|
Group: &pbproxystate.Cluster_EndpointGroup{
|
|
EndpointGroup: &pbproxystate.EndpointGroup{
|
|
Group: &pbproxystate.EndpointGroup_Dynamic{
|
|
Dynamic: &pbproxystate.DynamicEndpointGroup{
|
|
Config: &pbproxystate.DynamicEndpointGroupConfig{
|
|
ConnectTimeout: durationpb.New(5 * time.Second),
|
|
// Endpoints are managed separately by EDS
|
|
// Having an empty config enables outlier detection with default config.
|
|
OutlierDetection: &pbproxystate.OutlierDetection{},
|
|
DisablePanicThreshold: true,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
sni := connect.ServiceSNI(
|
|
uid.Name, "", uid.NamespaceOrDefault(),
|
|
uid.PartitionOrDefault(), cfgSnap.Datacenter,
|
|
cfgSnap.Roots.TrustDomain)
|
|
transportSocket, err := s.createOutboundMeshMTLS(cfgSnap, []string{getSpiffeID(cfgSnap, uid)}, sni)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.GetEndpointGroup().GetDynamic().OutboundTls = transportSocket
|
|
clusters[name] = c
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return clusters, nil
|
|
}
|
|
|
|
func getSpiffeID(cfgSnap *proxycfg.ConfigSnapshot, uid proxycfg.UpstreamID) string {
|
|
spiffeIDService := &connect.SpiffeIDService{
|
|
Host: cfgSnap.Roots.TrustDomain,
|
|
Partition: uid.PartitionOrDefault(),
|
|
Namespace: uid.NamespaceOrDefault(),
|
|
Datacenter: cfgSnap.Datacenter,
|
|
Service: uid.Name,
|
|
}
|
|
return spiffeIDService.URI().String()
|
|
}
|
|
func clusterNameForDestination(cfgSnap *proxycfg.ConfigSnapshot, name string,
|
|
address string, namespace string, partition string) string {
|
|
name = destinationSpecificServiceName(name, address)
|
|
sni := connect.ServiceSNI(name, "", namespace, partition,
|
|
cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
|
|
|
|
// Prefixed with destination to distinguish from non-passthrough clusters
|
|
// for the same upstream.
|
|
return "destination." + sni
|
|
}
|
|
|
|
func destinationSpecificServiceName(name string, address string) string {
|
|
address = strings.ReplaceAll(address, ":", "-")
|
|
address = strings.ReplaceAll(address, ".", "-")
|
|
return fmt.Sprintf("%s.%s", address, name)
|
|
}
|
|
|
|
// TODO(proxystate): Mesh Gateways will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func clustersFromSnapshotMeshGateway
|
|
// func haveVoters
|
|
|
|
// TODO(proxystate): Peering will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func makePeerServerClusters
|
|
|
|
// TODO(proxystate): Terminating Gateways will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func clustersFromSnapshotTerminatingGateway
|
|
|
|
// TODO(proxystate): Mesh Gateways will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func makeGatewayServiceClusters
|
|
|
|
// TODO(proxystate): Cluster Peering will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func makeGatewayOutgoingClusterPeeringServiceClusters
|
|
|
|
// TODO(proxystate): Terminating Gateways will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func makeDestinationClusters
|
|
|
|
// TODO(proxystate): Mesh Gateways will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func injectGatewayServiceAddons
|
|
|
|
// TODO(proxystate): Terminating Gateways will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func injectGatewayDestinationAddons
|
|
|
|
// TODO(proxystate): Ingress Gateways will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func clustersFromSnapshotIngressGateway
|
|
|
|
// TODO(proxystate): API Gateways will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func clustersFromSnapshotAPIGateway
|
|
|
|
// TODO(proxystate): Ingress Gateways will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func configIngressUpstreamCluster
|
|
|
|
func (s *Converter) makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot, name, pathProtocol string, port int) (*namedCluster, error) {
|
|
var err error
|
|
namedCluster := &namedCluster{}
|
|
|
|
cfg, err := config.ParseProxyConfig(cfgSnap.Proxy.Config)
|
|
if err != nil {
|
|
// Don't hard fail on a config typo, just warn. The parse func returns
|
|
// default config if there is an error so it's safe to continue.
|
|
s.Logger.Warn("failed to parse Connect.Proxy.Config", "error", err)
|
|
}
|
|
|
|
//// If we have overridden local cluster config try to parse it into an Envoy cluster
|
|
//if cfg.LocalClusterJSON != "" {
|
|
// return makeClusterFromUserConfig(cfg.LocalClusterJSON)
|
|
//}
|
|
|
|
var endpoint *pbproxystate.Endpoint
|
|
if cfgSnap.Proxy.LocalServiceSocketPath != "" {
|
|
endpoint = makeUnixSocketEndpoint(cfgSnap.Proxy.LocalServiceSocketPath)
|
|
} else {
|
|
addr := cfgSnap.Proxy.LocalServiceAddress
|
|
if addr == "" {
|
|
addr = "127.0.0.1"
|
|
}
|
|
endpoint = makeHostPortEndpoint(addr, port)
|
|
}
|
|
s.proxyState.Endpoints[name] = &pbproxystate.Endpoints{
|
|
Endpoints: []*pbproxystate.Endpoint{endpoint},
|
|
}
|
|
|
|
namedCluster.name = name
|
|
namedCluster.cluster = &pbproxystate.Cluster{
|
|
Group: &pbproxystate.Cluster_EndpointGroup{
|
|
EndpointGroup: &pbproxystate.EndpointGroup{
|
|
Group: &pbproxystate.EndpointGroup_Static{
|
|
Static: &pbproxystate.StaticEndpointGroup{
|
|
Config: &pbproxystate.StaticEndpointGroupConfig{
|
|
ConnectTimeout: durationpb.New(time.Duration(cfg.LocalConnectTimeoutMs) * time.Millisecond),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
protocol := pathProtocol
|
|
if protocol == "" {
|
|
protocol = cfg.Protocol
|
|
}
|
|
namedCluster.cluster.Protocol = protocolMap[protocol]
|
|
if cfg.MaxInboundConnections > 0 {
|
|
namedCluster.cluster.GetEndpointGroup().GetStatic().GetConfig().
|
|
CircuitBreakers = &pbproxystate.CircuitBreakers{
|
|
UpstreamLimits: &pbproxystate.UpstreamLimits{
|
|
MaxConnections: response.MakeUint32Value(cfg.MaxInboundConnections),
|
|
},
|
|
}
|
|
}
|
|
|
|
return namedCluster, err
|
|
}
|
|
|
|
func (s *Converter) makeUpstreamClusterForPeerService(
|
|
uid proxycfg.UpstreamID,
|
|
upstreamConfig structs.UpstreamConfig,
|
|
peerMeta structs.PeeringServiceMeta,
|
|
cfgSnap *proxycfg.ConfigSnapshot,
|
|
) (string, *pbproxystate.Cluster, *pbproxystate.Endpoints, error) {
|
|
var (
|
|
c *pbproxystate.Cluster
|
|
e *pbproxystate.Endpoints
|
|
err error
|
|
)
|
|
|
|
// TODO(proxystate): escapeHatches will be implemented in the future
|
|
//if upstreamConfig.EnvoyClusterJSON != "" {
|
|
// c, err = makeClusterFromUserConfig(upstreamConfig.EnvoyClusterJSON)
|
|
// if err != nil {
|
|
// return "", c, e, err
|
|
// }
|
|
// // In the happy path don't return yet as we need to inject TLS config still.
|
|
//}
|
|
|
|
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams()
|
|
|
|
if err != nil {
|
|
return "", c, e, err
|
|
}
|
|
|
|
tbs, ok := upstreamsSnapshot.UpstreamPeerTrustBundles.Get(uid.Peer)
|
|
if !ok {
|
|
// this should never happen since we loop through upstreams with
|
|
// set trust bundles
|
|
return "", c, e, fmt.Errorf("trust bundle not ready for peer %s", uid.Peer)
|
|
}
|
|
|
|
clusterName := generatePeeredClusterName(uid, tbs)
|
|
|
|
outlierDetection := makeOutlierDetection(upstreamConfig.PassiveHealthCheck, nil, true)
|
|
// We can't rely on health checks for services on cluster peers because they
|
|
// don't take into account service resolvers, splitters and routers. Setting
|
|
// MaxEjectionPercent too 100% gives outlier detection the power to eject the
|
|
// entire cluster.
|
|
outlierDetection.MaxEjectionPercent = &wrapperspb.UInt32Value{Value: 100}
|
|
|
|
s.Logger.Trace("generating cluster for", "cluster", clusterName)
|
|
if c == nil {
|
|
c = &pbproxystate.Cluster{}
|
|
|
|
useEDS := true
|
|
if _, ok := cfgSnap.ConnectProxy.PeerUpstreamEndpointsUseHostnames[uid]; ok {
|
|
// If we're using local mesh gw, the fact that upstreams use hostnames don't matter.
|
|
// If we're not using local mesh gw, then resort to CDS.
|
|
if upstreamConfig.MeshGateway.Mode != structs.MeshGatewayModeLocal {
|
|
useEDS = false
|
|
}
|
|
}
|
|
|
|
// If none of the service instances are addressed by a hostname we
|
|
// provide the endpoint IP addresses via EDS
|
|
if useEDS {
|
|
d := &pbproxystate.DynamicEndpointGroup{
|
|
Config: &pbproxystate.DynamicEndpointGroupConfig{
|
|
UseAltStatName: false,
|
|
ConnectTimeout: durationpb.New(time.Duration(upstreamConfig.ConnectTimeoutMs) * time.Millisecond),
|
|
DisablePanicThreshold: true,
|
|
CircuitBreakers: &pbproxystate.CircuitBreakers{
|
|
UpstreamLimits: makeUpstreamLimitsIfNeeded(upstreamConfig.Limits),
|
|
},
|
|
OutlierDetection: outlierDetection,
|
|
},
|
|
}
|
|
c.Group = &pbproxystate.Cluster_EndpointGroup{
|
|
EndpointGroup: &pbproxystate.EndpointGroup{
|
|
Group: &pbproxystate.EndpointGroup_Dynamic{
|
|
Dynamic: d,
|
|
},
|
|
},
|
|
}
|
|
transportSocket := &pbproxystate.TransportSocket{
|
|
ConnectionTls: &pbproxystate.TransportSocket_OutboundMesh{
|
|
OutboundMesh: &pbproxystate.OutboundMeshMTLS{
|
|
ValidationContext: &pbproxystate.MeshOutboundValidationContext{
|
|
SpiffeIds: peerMeta.SpiffeID,
|
|
TrustBundlePeerNameKey: uid.Peer,
|
|
},
|
|
Sni: peerMeta.PrimarySNI(),
|
|
},
|
|
},
|
|
}
|
|
d.OutboundTls = transportSocket
|
|
} else {
|
|
d := &pbproxystate.DNSEndpointGroup{
|
|
Config: &pbproxystate.DNSEndpointGroupConfig{
|
|
UseAltStatName: false,
|
|
ConnectTimeout: durationpb.New(time.Duration(upstreamConfig.ConnectTimeoutMs) * time.Millisecond),
|
|
DisablePanicThreshold: true,
|
|
CircuitBreakers: &pbproxystate.CircuitBreakers{
|
|
UpstreamLimits: makeUpstreamLimitsIfNeeded(upstreamConfig.Limits),
|
|
},
|
|
OutlierDetection: outlierDetection,
|
|
},
|
|
}
|
|
c.Group = &pbproxystate.Cluster_EndpointGroup{
|
|
EndpointGroup: &pbproxystate.EndpointGroup{
|
|
Group: &pbproxystate.EndpointGroup_Dns{
|
|
Dns: d,
|
|
},
|
|
},
|
|
}
|
|
e = &pbproxystate.Endpoints{
|
|
Endpoints: make([]*pbproxystate.Endpoint, 0),
|
|
}
|
|
|
|
ep, _ := cfgSnap.ConnectProxy.PeerUpstreamEndpoints.Get(uid)
|
|
configureClusterWithHostnames(
|
|
s.Logger,
|
|
d,
|
|
e,
|
|
"", /*TODO:make configurable?*/
|
|
ep,
|
|
true, /*isRemote*/
|
|
false, /*onlyPassing*/
|
|
)
|
|
transportSocket := &pbproxystate.TransportSocket{
|
|
ConnectionTls: &pbproxystate.TransportSocket_OutboundMesh{
|
|
OutboundMesh: &pbproxystate.OutboundMeshMTLS{
|
|
ValidationContext: &pbproxystate.MeshOutboundValidationContext{
|
|
SpiffeIds: peerMeta.SpiffeID,
|
|
TrustBundlePeerNameKey: uid.Peer,
|
|
},
|
|
Sni: peerMeta.PrimarySNI(),
|
|
},
|
|
},
|
|
}
|
|
d.OutboundTls = transportSocket
|
|
}
|
|
}
|
|
|
|
return clusterName, c, e, nil
|
|
}
|
|
|
|
func (s *Converter) makeUpstreamClusterForPreparedQuery(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (*namedCluster, error) {
|
|
var c *pbproxystate.Cluster
|
|
var err error
|
|
|
|
uid := proxycfg.NewUpstreamID(&upstream)
|
|
|
|
dc := upstream.Datacenter
|
|
if dc == "" {
|
|
dc = cfgSnap.Datacenter
|
|
}
|
|
sni := connect.UpstreamSNI(&upstream, "", dc, cfgSnap.Roots.TrustDomain)
|
|
|
|
cfg, _ := structs.ParseUpstreamConfig(upstream.Config)
|
|
// TODO(proxystate): add logger and enable this
|
|
//if err != nil {
|
|
// Don't hard fail on a config typo, just warn. The parse func returns
|
|
// default config if there is an error so it's safe to continue.
|
|
//s.Logger.Warn("failed to parse", "upstream", uid, "error", err)
|
|
//}
|
|
|
|
// TODO(proxystate): escapeHatches will be implemented in the future
|
|
//if cfg.EnvoyClusterJSON != "" {
|
|
// c, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON)
|
|
// if err != nil {
|
|
// return c, err
|
|
// }
|
|
// // In the happy path don't return yet as we need to inject TLS config still.
|
|
//}
|
|
|
|
if c == nil {
|
|
c = &pbproxystate.Cluster{
|
|
Protocol: protocolMap[cfg.Protocol],
|
|
Group: &pbproxystate.Cluster_EndpointGroup{
|
|
EndpointGroup: &pbproxystate.EndpointGroup{
|
|
Group: &pbproxystate.EndpointGroup_Dynamic{
|
|
Dynamic: &pbproxystate.DynamicEndpointGroup{
|
|
Config: &pbproxystate.DynamicEndpointGroupConfig{
|
|
ConnectTimeout: durationpb.New(time.Duration(cfg.ConnectTimeoutMs) * time.Millisecond),
|
|
// Endpoints are managed separately by EDS
|
|
// Having an empty config enables outlier detection with default config.
|
|
OutlierDetection: makeOutlierDetection(cfg.PassiveHealthCheck, nil, true),
|
|
CircuitBreakers: &pbproxystate.CircuitBreakers{
|
|
UpstreamLimits: makeUpstreamLimitsIfNeeded(cfg.Limits),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
endpoints := cfgSnap.ConnectProxy.PreparedQueryEndpoints[uid]
|
|
var (
|
|
spiffeIDs = make([]string, 0)
|
|
seen = make(map[string]struct{})
|
|
)
|
|
for _, e := range endpoints {
|
|
id := fmt.Sprintf("%s/%s", e.Node.Datacenter, e.Service.CompoundServiceName())
|
|
if _, ok := seen[id]; ok {
|
|
continue
|
|
}
|
|
seen[id] = struct{}{}
|
|
|
|
name := e.Service.Proxy.DestinationServiceName
|
|
if e.Service.Connect.Native {
|
|
name = e.Service.Service
|
|
}
|
|
|
|
spiffeIDs = append(spiffeIDs, connect.SpiffeIDService{
|
|
Host: cfgSnap.Roots.TrustDomain,
|
|
Namespace: e.Service.NamespaceOrDefault(),
|
|
Partition: e.Service.PartitionOrDefault(),
|
|
Datacenter: e.Node.Datacenter,
|
|
Service: name,
|
|
}.URI().String())
|
|
}
|
|
|
|
transportSocket, err := s.createOutboundMeshMTLS(cfgSnap, spiffeIDs, sni)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.GetEndpointGroup().GetDynamic().OutboundTls = transportSocket
|
|
|
|
return &namedCluster{name: sni, cluster: c}, nil
|
|
}
|
|
|
|
func finalizeUpstreamConfig(cfg structs.UpstreamConfig, chain *structs.CompiledDiscoveryChain, connectTimeout time.Duration) structs.UpstreamConfig {
|
|
if cfg.Protocol == "" {
|
|
cfg.Protocol = chain.Protocol
|
|
}
|
|
|
|
if cfg.Protocol == "" {
|
|
cfg.Protocol = "tcp"
|
|
}
|
|
|
|
if cfg.ConnectTimeoutMs == 0 {
|
|
cfg.ConnectTimeoutMs = int(connectTimeout / time.Millisecond)
|
|
}
|
|
return cfg
|
|
}
|
|
|
|
func (s *Converter) createOutboundMeshMTLS(cfgSnap *proxycfg.ConfigSnapshot, spiffeIDs []string, sni string) (*pbproxystate.TransportSocket, error) {
|
|
switch cfgSnap.Kind {
|
|
case structs.ServiceKindConnectProxy:
|
|
case structs.ServiceKindMeshGateway:
|
|
default:
|
|
return nil, fmt.Errorf("cannot inject peering trust bundles for kind %q", cfgSnap.Kind)
|
|
}
|
|
|
|
// Add all trust bundle peer names, including local.
|
|
trustBundlePeerNames := []string{resource.DefaultPeerName}
|
|
for _, tb := range cfgSnap.PeeringTrustBundles() {
|
|
trustBundlePeerNames = append(trustBundlePeerNames, tb.PeerName)
|
|
}
|
|
// Arbitrary UUID to reference the identity by.
|
|
uuid, err := uuid.GenerateUUID()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create the transport socket
|
|
ts := &pbproxystate.TransportSocket{}
|
|
|
|
ts.ConnectionTls = &pbproxystate.TransportSocket_OutboundMesh{
|
|
OutboundMesh: &pbproxystate.OutboundMeshMTLS{
|
|
IdentityKey: uuid,
|
|
ValidationContext: &pbproxystate.MeshOutboundValidationContext{
|
|
TrustBundlePeerNameKey: trustBundlePeerNames[0],
|
|
SpiffeIds: spiffeIDs,
|
|
},
|
|
Sni: sni,
|
|
},
|
|
}
|
|
s.proxyState.LeafCertificates[uuid] = &pbproxystate.LeafCertificate{
|
|
Cert: cfgSnap.Leaf().CertPEM,
|
|
Key: cfgSnap.Leaf().PrivateKeyPEM,
|
|
}
|
|
ts.TlsParameters = makeTLSParametersFromProxyTLSConfig(cfgSnap.MeshConfigTLSOutgoing())
|
|
|
|
return ts, nil
|
|
}
|
|
func (s *Converter) makeUpstreamClustersForDiscoveryChain(
|
|
uid proxycfg.UpstreamID,
|
|
upstream *structs.Upstream,
|
|
chain *structs.CompiledDiscoveryChain,
|
|
cfgSnap *proxycfg.ConfigSnapshot,
|
|
forMeshGateway bool,
|
|
) (map[string]*pbproxystate.Cluster, error) {
|
|
if chain == nil {
|
|
return nil, fmt.Errorf("cannot create upstream cluster without discovery chain for %s", uid)
|
|
}
|
|
|
|
if uid.Peer != "" && forMeshGateway {
|
|
return nil, fmt.Errorf("impossible to get a peer discovery chain in a mesh gateway")
|
|
}
|
|
|
|
upstreamConfigMap := make(map[string]interface{})
|
|
if upstream != nil {
|
|
upstreamConfigMap = upstream.Config
|
|
}
|
|
|
|
upstreamsSnapshot, err := cfgSnap.ToConfigSnapshotUpstreams()
|
|
|
|
// Mesh gateways are exempt because upstreamsSnapshot is only used for
|
|
// cluster peering targets and transative failover/redirects are unsupported.
|
|
if err != nil && !forMeshGateway {
|
|
return nil, err
|
|
}
|
|
|
|
rawUpstreamConfig, err := structs.ParseUpstreamConfigNoDefaults(upstreamConfigMap)
|
|
if err != nil {
|
|
// Don't hard fail on a config typo, just warn. The parse func returns
|
|
// default config if there is an error so it's safe to continue.
|
|
s.Logger.Warn("failed to parse", "upstream", uid,
|
|
"error", err)
|
|
}
|
|
|
|
// TODO(proxystate): escapeHatches will be implemented in the future
|
|
//var escapeHatchCluster *pbproxystate.Cluster
|
|
//if !forMeshGateway {
|
|
// if rawUpstreamConfig.EnvoyClusterJSON != "" {
|
|
// if chain.Default {
|
|
// // If you haven't done anything to setup the discovery chain, then
|
|
// // you can use the envoy_cluster_json escape hatch.
|
|
// escapeHatchCluster = &pbproxystate.Cluster{
|
|
// EscapeHatchClusterJson: rawUpstreamConfig.EnvoyClusterJSON,
|
|
// }
|
|
// } else {
|
|
// s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configured for",
|
|
// "discovery chain", chain.ServiceName, "upstream", uid,
|
|
// "envoy_cluster_json", chain.ServiceName)
|
|
// }
|
|
// }
|
|
//}
|
|
|
|
out := make(map[string]*pbproxystate.Cluster)
|
|
for _, node := range chain.Nodes {
|
|
switch {
|
|
case node == nil:
|
|
return nil, fmt.Errorf("impossible to process a nil node")
|
|
case node.Type != structs.DiscoveryGraphNodeTypeResolver:
|
|
continue
|
|
case node.Resolver == nil:
|
|
return nil, fmt.Errorf("impossible to process a non-resolver node")
|
|
}
|
|
// These variables are prefixed with primary to avoid shaddowing bugs.
|
|
primaryTargetID := node.Resolver.Target
|
|
primaryTarget := chain.Targets[primaryTargetID]
|
|
primaryTargetClusterName := s.getTargetClusterName(upstreamsSnapshot, chain, primaryTargetID, forMeshGateway)
|
|
if primaryTargetClusterName == "" {
|
|
continue
|
|
}
|
|
if forMeshGateway && !cfgSnap.Locality.Matches(primaryTarget.Datacenter, primaryTarget.Partition) {
|
|
s.Logger.Warn("ignoring discovery chain target that crosses a datacenter or partition boundary in a mesh gateway",
|
|
"target", primaryTarget,
|
|
"gatewayLocality", cfgSnap.Locality,
|
|
)
|
|
continue
|
|
}
|
|
|
|
upstreamConfig := finalizeUpstreamConfig(rawUpstreamConfig, chain, node.Resolver.ConnectTimeout)
|
|
|
|
mappedTargets, err := s.mapDiscoChainTargets(cfgSnap, chain, node, upstreamConfig, forMeshGateway)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
targetGroups, err := mappedTargets.groupedTargets()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var failoverGroup *pbproxystate.FailoverGroup
|
|
endpointGroups := make([]*pbproxystate.EndpointGroup, 0)
|
|
if mappedTargets.failover {
|
|
// Create a failover group. The endpoint groups that are part of this failover group are created by the loop
|
|
// below.
|
|
failoverGroup = &pbproxystate.FailoverGroup{
|
|
Config: &pbproxystate.FailoverGroupConfig{
|
|
ConnectTimeout: durationpb.New(node.Resolver.ConnectTimeout),
|
|
UseAltStatName: true,
|
|
},
|
|
}
|
|
}
|
|
|
|
// Construct the target dynamic endpoint groups. If these are not part of a failover group, they will get added
|
|
// directly to the map of pbproxystate.Cluster, if they are a part of a failover group, they will be added to
|
|
// the failover group.
|
|
for _, groupedTarget := range targetGroups {
|
|
s.Logger.Debug("generating cluster for", "cluster", groupedTarget.ClusterName)
|
|
dynamic := &pbproxystate.DynamicEndpointGroup{
|
|
Config: &pbproxystate.DynamicEndpointGroupConfig{
|
|
UseAltStatName: true,
|
|
ConnectTimeout: durationpb.New(node.Resolver.ConnectTimeout),
|
|
// TODO(peering): make circuit breakers or outlier detection work?
|
|
CircuitBreakers: &pbproxystate.CircuitBreakers{
|
|
UpstreamLimits: makeUpstreamLimitsIfNeeded(upstreamConfig.Limits),
|
|
},
|
|
DisablePanicThreshold: true,
|
|
OutlierDetection: makeOutlierDetection(upstreamConfig.PassiveHealthCheck, nil, true),
|
|
},
|
|
}
|
|
ti := groupedTarget.Targets[0]
|
|
transportSocket, err := s.createOutboundMeshMTLS(cfgSnap, ti.SpiffeIDs, ti.SNI)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
dynamic.OutboundTls = transportSocket
|
|
|
|
var lb *structs.LoadBalancer
|
|
if node.LoadBalancer != nil {
|
|
lb = node.LoadBalancer
|
|
}
|
|
if err := injectLBToCluster(lb, dynamic.Config); err != nil {
|
|
return nil, fmt.Errorf("failed to apply load balancer configuration to cluster %q: %v", groupedTarget.ClusterName, err)
|
|
}
|
|
|
|
// TODO: IR: http2 options not currently supported
|
|
//if upstreamConfig.Protocol == "http2" || upstreamConfig.Protocol == "grpc" {
|
|
// if err := s.setHttp2ProtocolOptions(c); err != nil {
|
|
// return nil, err
|
|
// }
|
|
//}
|
|
|
|
switch len(groupedTarget.Targets) {
|
|
case 0:
|
|
continue
|
|
case 1:
|
|
// We expect one target so this passes through to continue setting the cluster up.
|
|
default:
|
|
return nil, fmt.Errorf("cannot have more than one target")
|
|
}
|
|
|
|
if targetInfo := groupedTarget.Targets[0]; targetInfo.TransportSocket != nil {
|
|
dynamic.OutboundTls = targetInfo.TransportSocket
|
|
}
|
|
|
|
// If the endpoint group is part of a failover group, add it to the failover group. Otherwise add it
|
|
// directly to the clusters.
|
|
if failoverGroup != nil {
|
|
eg := &pbproxystate.EndpointGroup{
|
|
Group: &pbproxystate.EndpointGroup_Dynamic{
|
|
Dynamic: dynamic,
|
|
},
|
|
Name: groupedTarget.ClusterName,
|
|
}
|
|
endpointGroups = append(endpointGroups, eg)
|
|
} else {
|
|
cluster := &pbproxystate.Cluster{
|
|
AltStatName: mappedTargets.baseClusterName,
|
|
Protocol: protocolMap[upstreamConfig.Protocol],
|
|
Group: &pbproxystate.Cluster_EndpointGroup{
|
|
EndpointGroup: &pbproxystate.EndpointGroup{
|
|
Group: &pbproxystate.EndpointGroup_Dynamic{
|
|
Dynamic: dynamic,
|
|
},
|
|
},
|
|
},
|
|
Name: mappedTargets.baseClusterName,
|
|
}
|
|
|
|
out[mappedTargets.baseClusterName] = cluster
|
|
}
|
|
}
|
|
|
|
// If there's a failover group, we only add the failover group to the top level list of clusters. Its endpoint
|
|
// groups are inlined.
|
|
if failoverGroup != nil {
|
|
failoverGroup.EndpointGroups = endpointGroups
|
|
cluster := &pbproxystate.Cluster{
|
|
AltStatName: mappedTargets.baseClusterName,
|
|
Protocol: protocolMap[upstreamConfig.Protocol],
|
|
Group: &pbproxystate.Cluster_FailoverGroup{
|
|
FailoverGroup: failoverGroup,
|
|
},
|
|
}
|
|
out[mappedTargets.baseClusterName] = cluster
|
|
}
|
|
}
|
|
|
|
//if escapeHatchCluster != nil {
|
|
// if len(out) != 1 {
|
|
// return nil, fmt.Errorf("cannot inject escape hatch cluster when discovery chain had no nodes")
|
|
// }
|
|
// var defaultCluster *pbproxystate.Cluster
|
|
// for _, k := range out {
|
|
// defaultCluster = k
|
|
// break
|
|
// }
|
|
//
|
|
// // Overlay what the user provided.
|
|
// escapeHatchCluster.GetEndpointGroup().GetDynamic().OutboundTls.ConnectionTls =
|
|
// defaultCluster.GetEndpointGroup().GetDynamic().OutboundTls.ConnectionTls
|
|
//
|
|
// out = append(out, escapeHatchCluster)
|
|
//}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// TODO(proxystate): Mesh Gateways will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func makeExportedUpstreamClustersForMeshGateway
|
|
|
|
// makeClusterFromUserConfig returns the listener config decoded from an
|
|
// arbitrary proto3 json format string or an error if it's invalid.
|
|
//
|
|
// For now we only support embedding in JSON strings because of the hcl parsing
|
|
// pain (see Background section in the comment for decode.HookWeakDecodeFromSlice).
|
|
// This may be fixed in decode.HookWeakDecodeFromSlice in the future.
|
|
//
|
|
// When we do that we can support just nesting the config directly into the
|
|
// JSON/hcl naturally but this is a stop-gap that gets us an escape hatch
|
|
// immediately. It's also probably not a bad thing to support long-term since
|
|
// any config generated by other systems will likely be in canonical protobuf
|
|
// from rather than our slight variant in JSON/hcl.
|
|
|
|
// TODO(proxystate): Mesh and Terminating Gateways will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func makeGatewayCluster
|
|
|
|
func configureClusterWithHostnames(
|
|
logger hclog.Logger,
|
|
dnsEndpointGroup *pbproxystate.DNSEndpointGroup,
|
|
endpointList *pbproxystate.Endpoints,
|
|
dnsDiscoveryType string,
|
|
// hostnameEndpoints is a list of endpoints with a hostname as their address
|
|
hostnameEndpoints structs.CheckServiceNodes,
|
|
// isRemote determines whether the cluster is in a remote DC or partition and we should prefer a WAN address
|
|
isRemote bool,
|
|
// onlyPassing determines whether endpoints that do not have a passing status should be considered unhealthy
|
|
onlyPassing bool,
|
|
) {
|
|
// When a service instance is addressed by a hostname we have Envoy do the DNS resolution
|
|
// by setting a DNS cluster type and passing the hostname endpoints via CDS.
|
|
if dnsEndpointGroup.Config == nil {
|
|
dnsEndpointGroup.Config = &pbproxystate.DNSEndpointGroupConfig{}
|
|
}
|
|
dnsEndpointGroup.Config.DiscoveryType = pbproxystate.DiscoveryType_DISCOVERY_TYPE_LOGICAL
|
|
if dnsDiscoveryType == "strict_dns" {
|
|
dnsEndpointGroup.Config.DiscoveryType = pbproxystate.DiscoveryType_DISCOVERY_TYPE_STRICT
|
|
}
|
|
|
|
endpoints := make([]*envoy_endpoint_v3.LbEndpoint, 0, 1)
|
|
uniqueHostnames := make(map[string]bool)
|
|
|
|
var (
|
|
hostname string
|
|
idx int
|
|
fallback *pbproxystate.Endpoint
|
|
)
|
|
for i, e := range hostnameEndpoints {
|
|
_, addr, port := e.BestAddress(isRemote)
|
|
uniqueHostnames[addr] = true
|
|
|
|
health, weight := calculateEndpointHealthAndWeight(e, onlyPassing)
|
|
if health == pbproxystate.HealthStatus_HEALTH_STATUS_UNHEALTHY {
|
|
fallback = makeLbEndpoint(addr, port, health, weight)
|
|
continue
|
|
}
|
|
|
|
if len(endpoints) == 0 {
|
|
endpointList.Endpoints = append(endpointList.Endpoints, makeLbEndpoint(addr, port, health, weight))
|
|
|
|
hostname = addr
|
|
idx = i
|
|
break
|
|
}
|
|
}
|
|
|
|
dc := hostnameEndpoints[idx].Node.Datacenter
|
|
service := hostnameEndpoints[idx].Service.CompoundServiceName()
|
|
|
|
// Fall back to last unhealthy endpoint if none were healthy
|
|
if len(endpoints) == 0 {
|
|
logger.Warn("upstream service does not contain any healthy instances",
|
|
"dc", dc, "service", service.String())
|
|
|
|
//endpoints = append(endpoints, fallback)
|
|
endpointList.Endpoints = append(endpointList.Endpoints, fallback)
|
|
}
|
|
if len(uniqueHostnames) > 1 {
|
|
logger.Warn(fmt.Sprintf("service contains instances with more than one unique hostname; only %q be resolved by Envoy", hostname),
|
|
"dc", dc, "service", service.String())
|
|
}
|
|
|
|
}
|
|
|
|
// TODO(proxystate): Terminating Gateways will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func makeExternalIPCluster
|
|
|
|
// TODO(proxystate): Terminating Gateways will be added in the future.
|
|
// Functions to add from agent/xds/clusters.go:
|
|
// func makeExternalHostnameCluster
|
|
|
|
func makeUpstreamLimitsIfNeeded(limits *structs.UpstreamLimits) *pbproxystate.UpstreamLimits {
|
|
if limits == nil {
|
|
return nil
|
|
}
|
|
|
|
upstreamLimits := &pbproxystate.UpstreamLimits{}
|
|
|
|
// Likewise, make sure to not set any threshold values on the zero-value in
|
|
// order to rely on Envoy defaults
|
|
if limits.MaxConnections != nil {
|
|
upstreamLimits.MaxConnections = response.MakeUint32Value(*limits.MaxConnections)
|
|
}
|
|
if limits.MaxPendingRequests != nil {
|
|
upstreamLimits.MaxPendingRequests = response.MakeUint32Value(*limits.MaxPendingRequests)
|
|
}
|
|
if limits.MaxConcurrentRequests != nil {
|
|
upstreamLimits.MaxConcurrentRequests = response.MakeUint32Value(*limits.MaxConcurrentRequests)
|
|
}
|
|
|
|
return upstreamLimits
|
|
}
|
|
|
|
func injectLBToCluster(ec *structs.LoadBalancer, dc *pbproxystate.DynamicEndpointGroupConfig) error {
|
|
if ec == nil {
|
|
return nil
|
|
}
|
|
|
|
switch ec.Policy {
|
|
case "":
|
|
return nil
|
|
case structs.LBPolicyLeastRequest:
|
|
lr := &pbproxystate.DynamicEndpointGroupConfig_LeastRequest{
|
|
LeastRequest: &pbproxystate.LBPolicyLeastRequest{},
|
|
}
|
|
|
|
dc.LbPolicy = lr
|
|
|
|
if ec.LeastRequestConfig != nil {
|
|
lr.LeastRequest.ChoiceCount = &wrapperspb.UInt32Value{Value: ec.LeastRequestConfig.ChoiceCount}
|
|
}
|
|
case structs.LBPolicyRoundRobin:
|
|
dc.LbPolicy = &pbproxystate.DynamicEndpointGroupConfig_RoundRobin{
|
|
RoundRobin: &pbproxystate.LBPolicyRoundRobin{},
|
|
}
|
|
|
|
case structs.LBPolicyRandom:
|
|
dc.LbPolicy = &pbproxystate.DynamicEndpointGroupConfig_Random{
|
|
Random: &pbproxystate.LBPolicyRandom{},
|
|
}
|
|
|
|
case structs.LBPolicyRingHash:
|
|
rh := &pbproxystate.DynamicEndpointGroupConfig_RingHash{
|
|
RingHash: &pbproxystate.LBPolicyRingHash{},
|
|
}
|
|
|
|
dc.LbPolicy = rh
|
|
|
|
if ec.RingHashConfig != nil {
|
|
rh.RingHash.MinimumRingSize = &wrapperspb.UInt64Value{Value: ec.RingHashConfig.MinimumRingSize}
|
|
rh.RingHash.MaximumRingSize = &wrapperspb.UInt64Value{Value: ec.RingHashConfig.MaximumRingSize}
|
|
}
|
|
case structs.LBPolicyMaglev:
|
|
dc.LbPolicy = &pbproxystate.DynamicEndpointGroupConfig_Maglev{
|
|
Maglev: &pbproxystate.LBPolicyMaglev{},
|
|
}
|
|
|
|
default:
|
|
return fmt.Errorf("unsupported load balancer policy %q", ec.Policy)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// generatePeeredClusterName returns an SNI-like cluster name which mimics PeeredServiceSNI
|
|
// but excludes partition information which could be ambiguous (local vs remote partition).
|
|
func generatePeeredClusterName(uid proxycfg.UpstreamID, tb *pbpeering.PeeringTrustBundle) string {
|
|
return strings.Join([]string{
|
|
uid.Name,
|
|
uid.NamespaceOrDefault(),
|
|
uid.Peer,
|
|
"external",
|
|
tb.TrustDomain,
|
|
}, ".")
|
|
}
|
|
|
|
func (s *Converter) getTargetClusterName(upstreamsSnapshot *proxycfg.ConfigSnapshotUpstreams, chain *structs.CompiledDiscoveryChain, tid string, forMeshGateway bool) string {
|
|
target := chain.Targets[tid]
|
|
clusterName := target.Name
|
|
targetUID := proxycfg.NewUpstreamIDFromTargetID(tid)
|
|
if targetUID.Peer != "" {
|
|
tbs, ok := upstreamsSnapshot.UpstreamPeerTrustBundles.Get(targetUID.Peer)
|
|
// We can't generate cluster on peers without the trust bundle. The
|
|
// trust bundle should be ready soon.
|
|
if !ok {
|
|
s.Logger.Debug("peer trust bundle not ready for discovery chain target",
|
|
"peer", targetUID.Peer,
|
|
"target", tid,
|
|
)
|
|
return ""
|
|
}
|
|
|
|
clusterName = generatePeeredClusterName(targetUID, tbs)
|
|
}
|
|
clusterName = naming.CustomizeClusterName(clusterName, chain)
|
|
if forMeshGateway {
|
|
clusterName = meshGatewayExportedClusterNamePrefix + clusterName
|
|
}
|
|
return clusterName
|
|
}
|
|
|
|
// Return an pbproxystate.OutlierDetection populated by the values from structs.PassiveHealthCheck.
|
|
// If all values are zero a default empty OutlierDetection will be returned to
|
|
// enable outlier detection with default values.
|
|
// - If override is not nil, it will overwrite the values from p, e.g., ingress gateway defaults
|
|
// - allowZero is added to handle the legacy case where connect-proxy and mesh gateway can set 0
|
|
// for EnforcingConsecutive5xx. Due to the definition of proto of PassiveHealthCheck, ingress
|
|
// gateway's EnforcingConsecutive5xx must be > 0.
|
|
func makeOutlierDetection(p *structs.PassiveHealthCheck, override *structs.PassiveHealthCheck, allowZero bool) *pbproxystate.OutlierDetection {
|
|
od := &pbproxystate.OutlierDetection{}
|
|
if p != nil {
|
|
|
|
if p.Interval != 0 {
|
|
od.Interval = durationpb.New(p.Interval)
|
|
}
|
|
if p.MaxFailures != 0 {
|
|
od.Consecutive_5Xx = &wrapperspb.UInt32Value{Value: p.MaxFailures}
|
|
}
|
|
|
|
if p.EnforcingConsecutive5xx != nil {
|
|
// NOTE: EnforcingConsecutive5xx must be greater than 0 for ingress-gateway
|
|
if *p.EnforcingConsecutive5xx != 0 {
|
|
od.EnforcingConsecutive_5Xx = &wrapperspb.UInt32Value{Value: *p.EnforcingConsecutive5xx}
|
|
} else if allowZero {
|
|
od.EnforcingConsecutive_5Xx = &wrapperspb.UInt32Value{Value: *p.EnforcingConsecutive5xx}
|
|
}
|
|
}
|
|
|
|
if p.MaxEjectionPercent != nil {
|
|
od.MaxEjectionPercent = &wrapperspb.UInt32Value{Value: *p.MaxEjectionPercent}
|
|
}
|
|
if p.BaseEjectionTime != nil {
|
|
od.BaseEjectionTime = durationpb.New(*p.BaseEjectionTime)
|
|
}
|
|
}
|
|
|
|
if override == nil {
|
|
return od
|
|
}
|
|
|
|
// override the default outlier detection value
|
|
if override.Interval != 0 {
|
|
od.Interval = durationpb.New(override.Interval)
|
|
}
|
|
if override.MaxFailures != 0 {
|
|
od.Consecutive_5Xx = &wrapperspb.UInt32Value{Value: override.MaxFailures}
|
|
}
|
|
|
|
if override.EnforcingConsecutive5xx != nil {
|
|
// NOTE: EnforcingConsecutive5xx must be great than 0 for ingress-gateway
|
|
if *override.EnforcingConsecutive5xx != 0 {
|
|
od.EnforcingConsecutive_5Xx = &wrapperspb.UInt32Value{Value: *override.EnforcingConsecutive5xx}
|
|
}
|
|
// Because only ingress gateways have overrides and they cannot have a value of 0, there is no allowZero
|
|
// override case to handle
|
|
}
|
|
|
|
if override.MaxEjectionPercent != nil {
|
|
od.MaxEjectionPercent = &wrapperspb.UInt32Value{Value: *override.MaxEjectionPercent}
|
|
}
|
|
if override.BaseEjectionTime != nil {
|
|
od.BaseEjectionTime = durationpb.New(*override.BaseEjectionTime)
|
|
}
|
|
|
|
return od
|
|
}
|
|
|
|
// protocolMap converts config entry protocols to proxystate protocol values.
|
|
// As documented on config entry protos, the valid values are "tcp", "http",
|
|
// "http2" and "grpc". Anything else is treated as tcp.
|
|
var protocolMap = map[string]pbproxystate.Protocol{
|
|
"http": pbproxystate.Protocol_PROTOCOL_HTTP,
|
|
"http2": pbproxystate.Protocol_PROTOCOL_HTTP2,
|
|
"grpc": pbproxystate.Protocol_PROTOCOL_GRPC,
|
|
"tcp": pbproxystate.Protocol_PROTOCOL_TCP,
|
|
}
|