mirror of
https://github.com/status-im/consul.git
synced 2025-01-09 21:35:52 +00:00
d574473fd1
Ensure that configuring a FailoverPolicy for a service that is reachable via a xRoute or a direct upstream causes an envoy aggregate cluster to be created for the original cluster name, but with separate clusters for each one of the possible destinations.
385 lines
13 KiB
Go
385 lines
13 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package xdsv2
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
|
|
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
|
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
|
envoy_aggregate_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/clusters/aggregate/v3"
|
|
envoy_upstreams_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3"
|
|
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
|
|
"google.golang.org/protobuf/proto"
|
|
"google.golang.org/protobuf/types/known/anypb"
|
|
|
|
"github.com/hashicorp/consul/envoyextensions/xdscommon"
|
|
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
|
|
)
|
|
|
|
func (pr *ProxyResources) doesEnvoyClusterAlreadyExist(name string) bool {
|
|
// TODO(proxystate): consider using a map instead of [] for this kind of lookup
|
|
for _, envoyCluster := range pr.envoyResources[xdscommon.ClusterType] {
|
|
if envoyCluster.(*envoy_cluster_v3.Cluster).Name == name {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (pr *ProxyResources) makeXDSClusters() ([]proto.Message, error) {
|
|
clusters := make([]proto.Message, 0)
|
|
|
|
for clusterName := range pr.proxyState.Clusters {
|
|
protoCluster, err := pr.makeClusters(clusterName)
|
|
// TODO: aggregate errors for clusters and still return any properly formed clusters.
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, protoCluster...)
|
|
}
|
|
|
|
return clusters, nil
|
|
}
|
|
|
|
func (pr *ProxyResources) makeClusters(name string) ([]proto.Message, error) {
|
|
clusters := make([]proto.Message, 0)
|
|
proxyStateCluster, ok := pr.proxyState.Clusters[name]
|
|
if !ok {
|
|
return nil, fmt.Errorf("cluster %q not found", name)
|
|
}
|
|
|
|
if pr.doesEnvoyClusterAlreadyExist(name) {
|
|
// don't error
|
|
return []proto.Message{}, nil
|
|
}
|
|
|
|
switch proxyStateCluster.Group.(type) {
|
|
case *pbproxystate.Cluster_FailoverGroup:
|
|
fg := proxyStateCluster.GetFailoverGroup()
|
|
clusters, err := pr.makeEnvoyAggregateCluster(name, proxyStateCluster.Protocol, fg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
for _, c := range clusters {
|
|
clusters = append(clusters, c)
|
|
}
|
|
|
|
case *pbproxystate.Cluster_EndpointGroup:
|
|
eg := proxyStateCluster.GetEndpointGroup()
|
|
cluster, err := pr.makeEnvoyCluster(name, proxyStateCluster.Protocol, eg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, cluster)
|
|
|
|
default:
|
|
return nil, errors.New("cluster group type should be Endpoint Group or Failover Group")
|
|
}
|
|
return clusters, nil
|
|
}
|
|
|
|
func (pr *ProxyResources) makeEnvoyCluster(name string, protocol string, eg *pbproxystate.EndpointGroup) (*envoy_cluster_v3.Cluster, error) {
|
|
if eg != nil {
|
|
switch t := eg.Group.(type) {
|
|
case *pbproxystate.EndpointGroup_Dynamic:
|
|
dynamic := eg.GetDynamic()
|
|
return pr.makeEnvoyDynamicCluster(name, protocol, dynamic)
|
|
case *pbproxystate.EndpointGroup_Static:
|
|
static := eg.GetStatic()
|
|
return pr.makeEnvoyStaticCluster(name, protocol, static)
|
|
case *pbproxystate.EndpointGroup_Dns:
|
|
dns := eg.GetDns()
|
|
return pr.makeEnvoyDnsCluster(name, protocol, dns)
|
|
case *pbproxystate.EndpointGroup_Passthrough:
|
|
passthrough := eg.GetPassthrough()
|
|
return pr.makeEnvoyPassthroughCluster(name, protocol, passthrough)
|
|
default:
|
|
return nil, fmt.Errorf("unsupported endpoint group type: %s", t)
|
|
}
|
|
}
|
|
return nil, fmt.Errorf("no endpoint group")
|
|
}
|
|
|
|
func (pr *ProxyResources) makeEnvoyDynamicCluster(name string, protocol string, dynamic *pbproxystate.DynamicEndpointGroup) (*envoy_cluster_v3.Cluster, error) {
|
|
cluster := &envoy_cluster_v3.Cluster{
|
|
Name: name,
|
|
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_EDS},
|
|
EdsClusterConfig: &envoy_cluster_v3.Cluster_EdsClusterConfig{
|
|
EdsConfig: &envoy_core_v3.ConfigSource{
|
|
ResourceApiVersion: envoy_core_v3.ApiVersion_V3,
|
|
ConfigSourceSpecifier: &envoy_core_v3.ConfigSource_Ads{
|
|
Ads: &envoy_core_v3.AggregatedConfigSource{},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
err := addHttpProtocolOptions(protocol, cluster)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if dynamic.Config != nil {
|
|
if dynamic.Config.UseAltStatName {
|
|
cluster.AltStatName = name
|
|
}
|
|
cluster.ConnectTimeout = dynamic.Config.ConnectTimeout
|
|
if !dynamic.Config.DisablePanicThreshold {
|
|
cluster.CommonLbConfig = &envoy_cluster_v3.Cluster_CommonLbConfig{
|
|
HealthyPanicThreshold: &envoy_type_v3.Percent{
|
|
Value: 0, // disable panic threshold
|
|
},
|
|
}
|
|
}
|
|
addEnvoyCircuitBreakers(dynamic.Config.CircuitBreakers, cluster)
|
|
addEnvoyOutlierDetection(dynamic.Config.OutlierDetection, cluster)
|
|
|
|
err := addEnvoyLBToCluster(dynamic.Config, cluster)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if dynamic.OutboundTls != nil {
|
|
envoyTransportSocket, err := pr.makeEnvoyTransportSocket(dynamic.OutboundTls)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cluster.TransportSocket = envoyTransportSocket
|
|
}
|
|
|
|
return cluster, nil
|
|
|
|
}
|
|
|
|
func (pr *ProxyResources) makeEnvoyStaticCluster(name string, protocol string, static *pbproxystate.StaticEndpointGroup) (*envoy_cluster_v3.Cluster, error) {
|
|
cluster := &envoy_cluster_v3.Cluster{
|
|
Name: name,
|
|
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_STATIC},
|
|
}
|
|
|
|
// todo (ishustava/v2): we need to be able to handle the case when empty endpoints are allowed on a cluster.
|
|
endpointList, ok := pr.proxyState.Endpoints[name]
|
|
if ok {
|
|
cluster.LoadAssignment = makeEnvoyClusterLoadAssignment(name, endpointList.Endpoints)
|
|
}
|
|
|
|
var err error
|
|
if name == xdscommon.LocalAppClusterName {
|
|
err = addLocalAppHttpProtocolOptions(protocol, cluster)
|
|
} else {
|
|
err = addHttpProtocolOptions(protocol, cluster)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if static.Config != nil {
|
|
cluster.ConnectTimeout = static.Config.ConnectTimeout
|
|
addEnvoyCircuitBreakers(static.GetConfig().CircuitBreakers, cluster)
|
|
}
|
|
return cluster, nil
|
|
}
|
|
|
|
func (pr *ProxyResources) makeEnvoyDnsCluster(name string, protocol string, dns *pbproxystate.DNSEndpointGroup) (*envoy_cluster_v3.Cluster, error) {
|
|
return nil, nil
|
|
}
|
|
|
|
func (pr *ProxyResources) makeEnvoyPassthroughCluster(name string, protocol string, passthrough *pbproxystate.PassthroughEndpointGroup) (*envoy_cluster_v3.Cluster, error) {
|
|
cluster := &envoy_cluster_v3.Cluster{
|
|
Name: name,
|
|
ConnectTimeout: passthrough.Config.ConnectTimeout,
|
|
LbPolicy: envoy_cluster_v3.Cluster_CLUSTER_PROVIDED,
|
|
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_ORIGINAL_DST},
|
|
}
|
|
if passthrough.OutboundTls != nil {
|
|
envoyTransportSocket, err := pr.makeEnvoyTransportSocket(passthrough.OutboundTls)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
cluster.TransportSocket = envoyTransportSocket
|
|
}
|
|
err := addHttpProtocolOptions(protocol, cluster)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return cluster, nil
|
|
}
|
|
|
|
func (pr *ProxyResources) makeEnvoyAggregateCluster(name string, protocol string, fg *pbproxystate.FailoverGroup) ([]*envoy_cluster_v3.Cluster, error) {
|
|
var clusters []*envoy_cluster_v3.Cluster
|
|
if fg != nil {
|
|
var egNames []string
|
|
for _, eg := range fg.EndpointGroups {
|
|
cluster, err := pr.makeEnvoyCluster(eg.Name, protocol, eg)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
egNames = append(egNames, cluster.Name)
|
|
clusters = append(clusters, cluster)
|
|
}
|
|
aggregateClusterConfig, err := anypb.New(&envoy_aggregate_cluster_v3.ClusterConfig{
|
|
Clusters: egNames,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c := &envoy_cluster_v3.Cluster{
|
|
Name: name,
|
|
ConnectTimeout: fg.Config.ConnectTimeout,
|
|
LbPolicy: envoy_cluster_v3.Cluster_CLUSTER_PROVIDED,
|
|
ClusterDiscoveryType: &envoy_cluster_v3.Cluster_ClusterType{
|
|
ClusterType: &envoy_cluster_v3.Cluster_CustomClusterType{
|
|
Name: "envoy.clusters.aggregate",
|
|
TypedConfig: aggregateClusterConfig,
|
|
},
|
|
},
|
|
}
|
|
if fg.Config.UseAltStatName {
|
|
c.AltStatName = name
|
|
}
|
|
err = addHttpProtocolOptions(protocol, c)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, c)
|
|
}
|
|
return clusters, nil
|
|
}
|
|
|
|
func addLocalAppHttpProtocolOptions(protocol string, c *envoy_cluster_v3.Cluster) error {
|
|
if !(protocol == "http2" || protocol == "grpc") {
|
|
// do not error. returning nil means it won't get set.
|
|
return nil
|
|
}
|
|
cfg := &envoy_upstreams_v3.HttpProtocolOptions{
|
|
UpstreamProtocolOptions: &envoy_upstreams_v3.HttpProtocolOptions_UseDownstreamProtocolConfig{
|
|
UseDownstreamProtocolConfig: &envoy_upstreams_v3.HttpProtocolOptions_UseDownstreamHttpConfig{
|
|
HttpProtocolOptions: &envoy_core_v3.Http1ProtocolOptions{},
|
|
Http2ProtocolOptions: &envoy_core_v3.Http2ProtocolOptions{},
|
|
},
|
|
},
|
|
}
|
|
any, err := anypb.New(cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.TypedExtensionProtocolOptions = map[string]*anypb.Any{
|
|
"envoy.extensions.upstreams.http.v3.HttpProtocolOptions": any,
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func addHttpProtocolOptions(protocol string, c *envoy_cluster_v3.Cluster) error {
|
|
if !(protocol == "http2" || protocol == "grpc") {
|
|
// do not error. returning nil means it won't get set.
|
|
return nil
|
|
}
|
|
cfg := &envoy_upstreams_v3.HttpProtocolOptions{
|
|
UpstreamProtocolOptions: &envoy_upstreams_v3.HttpProtocolOptions_ExplicitHttpConfig_{
|
|
ExplicitHttpConfig: &envoy_upstreams_v3.HttpProtocolOptions_ExplicitHttpConfig{
|
|
ProtocolConfig: &envoy_upstreams_v3.HttpProtocolOptions_ExplicitHttpConfig_Http2ProtocolOptions{
|
|
Http2ProtocolOptions: &envoy_core_v3.Http2ProtocolOptions{},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
any, err := anypb.New(cfg)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.TypedExtensionProtocolOptions = map[string]*anypb.Any{
|
|
"envoy.extensions.upstreams.http.v3.HttpProtocolOptions": any,
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// addEnvoyOutlierDetection will add outlier detection config to the cluster, and if nil, add empty OutlierDetection to
|
|
// enable it with default values
|
|
func addEnvoyOutlierDetection(outlierDetection *pbproxystate.OutlierDetection, c *envoy_cluster_v3.Cluster) {
|
|
if outlierDetection == nil {
|
|
return
|
|
}
|
|
od := &envoy_cluster_v3.OutlierDetection{
|
|
BaseEjectionTime: outlierDetection.GetBaseEjectionTime(),
|
|
Consecutive_5Xx: outlierDetection.GetConsecutive_5Xx(),
|
|
EnforcingConsecutive_5Xx: outlierDetection.GetEnforcingConsecutive_5Xx(),
|
|
Interval: outlierDetection.GetInterval(),
|
|
MaxEjectionPercent: outlierDetection.GetMaxEjectionPercent(),
|
|
}
|
|
c.OutlierDetection = od
|
|
|
|
}
|
|
|
|
func addEnvoyCircuitBreakers(circuitBreakers *pbproxystate.CircuitBreakers, c *envoy_cluster_v3.Cluster) {
|
|
if circuitBreakers != nil {
|
|
if circuitBreakers.UpstreamLimits == nil {
|
|
c.CircuitBreakers = &envoy_cluster_v3.CircuitBreakers{}
|
|
return
|
|
}
|
|
threshold := &envoy_cluster_v3.CircuitBreakers_Thresholds{}
|
|
threshold.MaxConnections = circuitBreakers.UpstreamLimits.MaxConnections
|
|
threshold.MaxPendingRequests = circuitBreakers.UpstreamLimits.MaxPendingRequests
|
|
threshold.MaxRequests = circuitBreakers.UpstreamLimits.MaxConcurrentRequests
|
|
|
|
c.CircuitBreakers = &envoy_cluster_v3.CircuitBreakers{
|
|
Thresholds: []*envoy_cluster_v3.CircuitBreakers_Thresholds{threshold},
|
|
}
|
|
}
|
|
}
|
|
|
|
func addEnvoyLBToCluster(dynamicConfig *pbproxystate.DynamicEndpointGroupConfig, c *envoy_cluster_v3.Cluster) error {
|
|
if dynamicConfig == nil || dynamicConfig.LbPolicy == nil {
|
|
return nil
|
|
}
|
|
|
|
switch d := dynamicConfig.LbPolicy.(type) {
|
|
case *pbproxystate.DynamicEndpointGroupConfig_LeastRequest:
|
|
c.LbPolicy = envoy_cluster_v3.Cluster_LEAST_REQUEST
|
|
|
|
lb := dynamicConfig.LbPolicy.(*pbproxystate.DynamicEndpointGroupConfig_LeastRequest)
|
|
if lb.LeastRequest != nil {
|
|
c.LbConfig = &envoy_cluster_v3.Cluster_LeastRequestLbConfig_{
|
|
LeastRequestLbConfig: &envoy_cluster_v3.Cluster_LeastRequestLbConfig{
|
|
ChoiceCount: lb.LeastRequest.ChoiceCount,
|
|
},
|
|
}
|
|
}
|
|
case *pbproxystate.DynamicEndpointGroupConfig_RoundRobin:
|
|
c.LbPolicy = envoy_cluster_v3.Cluster_ROUND_ROBIN
|
|
|
|
case *pbproxystate.DynamicEndpointGroupConfig_Random:
|
|
c.LbPolicy = envoy_cluster_v3.Cluster_RANDOM
|
|
|
|
case *pbproxystate.DynamicEndpointGroupConfig_RingHash:
|
|
c.LbPolicy = envoy_cluster_v3.Cluster_RING_HASH
|
|
|
|
lb := dynamicConfig.LbPolicy.(*pbproxystate.DynamicEndpointGroupConfig_RingHash)
|
|
if lb.RingHash != nil {
|
|
c.LbConfig = &envoy_cluster_v3.Cluster_RingHashLbConfig_{
|
|
RingHashLbConfig: &envoy_cluster_v3.Cluster_RingHashLbConfig{
|
|
MinimumRingSize: lb.RingHash.MinimumRingSize,
|
|
MaximumRingSize: lb.RingHash.MaximumRingSize,
|
|
},
|
|
}
|
|
}
|
|
case *pbproxystate.DynamicEndpointGroupConfig_Maglev:
|
|
c.LbPolicy = envoy_cluster_v3.Cluster_MAGLEV
|
|
|
|
default:
|
|
return fmt.Errorf("unsupported load balancer policy %q for cluster %q", d, c.Name)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// TODO(proxystate): In a future PR this will create clusters and add it to ProxyResources.proxyState
|
|
// Currently, we do not traverse the listener -> endpoint paths and instead just generate each resource by iterating
|
|
// through its top level map. In the future we want to traverse these paths to ensure each listener has a cluster, etc.
|
|
func (pr *ProxyResources) makeEnvoyClusterFromL4Destination(l4 *pbproxystate.L4Destination) error {
|
|
return nil
|
|
}
|