mirror of
https://github.com/status-im/consul.git
synced 2025-01-24 20:51:10 +00:00
8e22d80e35
Failover is pushed entirely down to the data plane by creating envoy clusters and putting each successive destination in a different load assignment priority band. For example this shows that normally requests go to 1.2.3.4:8080 but when that fails they go to 6.7.8.9:8080: - name: foo load_assignment: cluster_name: foo policy: overprovisioning_factor: 100000 endpoints: - priority: 0 lb_endpoints: - endpoint: address: socket_address: address: 1.2.3.4 port_value: 8080 - priority: 1 lb_endpoints: - endpoint: address: socket_address: address: 6.7.8.9 port_value: 8080 Mesh gateways route requests based solely on the SNI header tacked onto the TLS layer. Envoy currently only lets you configure the outbound SNI header at the cluster layer. If you try to failover through a mesh gateway you ideally would configure the SNI value per endpoint, but that's not possible in envoy today. This PR introduces a simpler way around the problem for now: 1. We identify any target of failover that will use mesh gateway mode local or remote and then further isolate any resolver node in the compiled discovery chain that has a failover destination set to one of those targets. 2. For each of these resolvers we will perform a small measurement of comparative healths of the endpoints that come back from the health API for the set of primary target and serial failover targets. We walk the list of targets in order and if any endpoint is healthy we return that target, otherwise we move on to the next target. 3. The CDS and EDS endpoints both perform the measurements in (2) for the affected resolver nodes. 4. For CDS this measurement selects which TLS SNI field to use for the cluster (note the cluster is always going to be named for the primary target) 5. For EDS this measurement selects which set of endpoints will populate the cluster. Priority tiered failover is ignored. One of the big downsides to this approach to failover is that the failover detection and correction is going to be controlled by consul rather than deferring that entirely to the data plane as with the prior version. This also means that we are bound to only failover using official health signals and cannot make use of data plane signals like outlier detection to affect failover. In this specific scenario the lack of data plane signals is ok because the effectiveness is already muted by the fact that the ultimate destination endpoints will have their data plane signals scrambled when they pass through the mesh gateway wrapper anyway so we're not losing much. Another related fix is that we now use the endpoint health from the underlying service, not the health of the gateway (regardless of failover mode).
405 lines
13 KiB
Go
405 lines
13 KiB
Go
package xds
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
|
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
|
|
envoycluster "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
|
|
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
|
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
|
|
envoytype "github.com/envoyproxy/go-control-plane/envoy/type"
|
|
"github.com/gogo/protobuf/jsonpb"
|
|
"github.com/gogo/protobuf/proto"
|
|
"github.com/gogo/protobuf/types"
|
|
|
|
"github.com/hashicorp/consul/agent/proxycfg"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
)
|
|
|
|
// clustersFromSnapshot returns the xDS API representation of the "clusters" in the snapshot.
|
|
func (s *Server) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
|
if cfgSnap == nil {
|
|
return nil, errors.New("nil config given")
|
|
}
|
|
|
|
switch cfgSnap.Kind {
|
|
case structs.ServiceKindConnectProxy:
|
|
return s.clustersFromSnapshotConnectProxy(cfgSnap, token)
|
|
case structs.ServiceKindMeshGateway:
|
|
return s.clustersFromSnapshotMeshGateway(cfgSnap, token)
|
|
default:
|
|
return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind)
|
|
}
|
|
}
|
|
|
|
// clustersFromSnapshot returns the xDS API representation of the "clusters"
|
|
// (upstreams) in the snapshot.
|
|
func (s *Server) clustersFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
|
// TODO(rb): this sizing is a low bound.
|
|
clusters := make([]proto.Message, 0, len(cfgSnap.Proxy.Upstreams)+1)
|
|
|
|
// Include the "app" cluster for the public listener
|
|
appCluster, err := s.makeAppCluster(cfgSnap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
clusters = append(clusters, appCluster)
|
|
|
|
for _, u := range cfgSnap.Proxy.Upstreams {
|
|
id := u.Identifier()
|
|
var chain *structs.CompiledDiscoveryChain
|
|
if u.DestinationType != structs.UpstreamDestTypePreparedQuery {
|
|
chain = cfgSnap.ConnectProxy.DiscoveryChain[id]
|
|
}
|
|
|
|
if chain == nil {
|
|
upstreamCluster, err := s.makeUpstreamCluster(u, cfgSnap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, upstreamCluster)
|
|
|
|
} else {
|
|
upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(u, chain, cfgSnap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, cluster := range upstreamClusters {
|
|
clusters = append(clusters, cluster)
|
|
}
|
|
}
|
|
}
|
|
|
|
return clusters, nil
|
|
}
|
|
|
|
// clustersFromSnapshotMeshGateway returns the xDS API representation of the "clusters"
|
|
// for a mesh gateway. This will include 1 cluster per remote datacenter as well as
|
|
// 1 cluster for each service subset.
|
|
func (s *Server) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]proto.Message, error) {
|
|
// 1 cluster per remote dc + 1 cluster per local service (this is a lower bound - all subset specific clusters will be appended)
|
|
clusters := make([]proto.Message, 0, len(cfgSnap.MeshGateway.GatewayGroups)+len(cfgSnap.MeshGateway.ServiceGroups))
|
|
|
|
// generate the remote dc clusters
|
|
for dc, _ := range cfgSnap.MeshGateway.GatewayGroups {
|
|
clusterName := DatacenterSNI(dc, cfgSnap)
|
|
|
|
cluster, err := s.makeMeshGatewayCluster(clusterName, cfgSnap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, cluster)
|
|
}
|
|
|
|
// generate the per-service clusters
|
|
for svc, _ := range cfgSnap.MeshGateway.ServiceGroups {
|
|
clusterName := ServiceSNI(svc, "", "default", cfgSnap.Datacenter, cfgSnap)
|
|
|
|
cluster, err := s.makeMeshGatewayCluster(clusterName, cfgSnap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, cluster)
|
|
}
|
|
|
|
// generate the service subset clusters
|
|
for svc, resolver := range cfgSnap.MeshGateway.ServiceResolvers {
|
|
for subsetName, _ := range resolver.Subsets {
|
|
clusterName := ServiceSNI(svc, subsetName, "default", cfgSnap.Datacenter, cfgSnap)
|
|
|
|
cluster, err := s.makeMeshGatewayCluster(clusterName, cfgSnap)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
clusters = append(clusters, cluster)
|
|
}
|
|
}
|
|
|
|
return clusters, nil
|
|
}
|
|
|
|
func (s *Server) makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
|
|
var c *envoy.Cluster
|
|
var err error
|
|
|
|
cfg, err := ParseProxyConfig(cfgSnap.Proxy.Config)
|
|
if err != nil {
|
|
// Don't hard fail on a config typo, just warn. The parse func returns
|
|
// default config if there is an error so it's safe to continue.
|
|
s.Logger.Printf("[WARN] envoy: failed to parse Connect.Proxy.Config: %s", err)
|
|
}
|
|
|
|
// If we have overridden local cluster config try to parse it into an Envoy cluster
|
|
if cfg.LocalClusterJSON != "" {
|
|
return makeClusterFromUserConfig(cfg.LocalClusterJSON)
|
|
}
|
|
|
|
addr := cfgSnap.Proxy.LocalServiceAddress
|
|
if addr == "" {
|
|
addr = "127.0.0.1"
|
|
}
|
|
c = &envoy.Cluster{
|
|
Name: LocalAppClusterName,
|
|
ConnectTimeout: time.Duration(cfg.LocalConnectTimeoutMs) * time.Millisecond,
|
|
ClusterDiscoveryType: &envoy.Cluster_Type{Type: envoy.Cluster_STATIC},
|
|
LoadAssignment: &envoy.ClusterLoadAssignment{
|
|
ClusterName: LocalAppClusterName,
|
|
Endpoints: []envoyendpoint.LocalityLbEndpoints{
|
|
{
|
|
LbEndpoints: []envoyendpoint.LbEndpoint{
|
|
makeEndpoint(LocalAppClusterName,
|
|
addr,
|
|
cfgSnap.Proxy.LocalServicePort),
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
if cfg.Protocol == "http2" || cfg.Protocol == "grpc" {
|
|
c.Http2ProtocolOptions = &envoycore.Http2ProtocolOptions{}
|
|
}
|
|
|
|
return c, err
|
|
}
|
|
|
|
func (s *Server) makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
|
|
var c *envoy.Cluster
|
|
var err error
|
|
|
|
ns := "default"
|
|
if upstream.DestinationNamespace != "" {
|
|
ns = upstream.DestinationNamespace
|
|
}
|
|
dc := cfgSnap.Datacenter
|
|
if upstream.Datacenter != "" {
|
|
dc = upstream.Datacenter
|
|
}
|
|
|
|
sni := ServiceSNI(upstream.DestinationName, "", ns, dc, cfgSnap)
|
|
if upstream.DestinationType == "prepared_query" {
|
|
sni = QuerySNI(upstream.DestinationName, dc, cfgSnap)
|
|
}
|
|
cfg, err := ParseUpstreamConfig(upstream.Config)
|
|
if err != nil {
|
|
// Don't hard fail on a config typo, just warn. The parse func returns
|
|
// default config if there is an error so it's safe to continue.
|
|
s.Logger.Printf("[WARN] envoy: failed to parse Upstream[%s].Config: %s",
|
|
upstream.Identifier(), err)
|
|
}
|
|
if cfg.ClusterJSON != "" {
|
|
c, err = makeClusterFromUserConfig(cfg.ClusterJSON)
|
|
if err != nil {
|
|
return c, err
|
|
}
|
|
// In the happy path don't return yet as we need to inject TLS config still.
|
|
}
|
|
|
|
if c == nil {
|
|
c = &envoy.Cluster{
|
|
Name: sni,
|
|
ConnectTimeout: time.Duration(cfg.ConnectTimeoutMs) * time.Millisecond,
|
|
ClusterDiscoveryType: &envoy.Cluster_Type{Type: envoy.Cluster_EDS},
|
|
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
|
|
EdsConfig: &envoycore.ConfigSource{
|
|
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
|
|
Ads: &envoycore.AggregatedConfigSource{},
|
|
},
|
|
},
|
|
},
|
|
// Having an empty config enables outlier detection with default config.
|
|
OutlierDetection: &envoycluster.OutlierDetection{},
|
|
}
|
|
if cfg.Protocol == "http2" || cfg.Protocol == "grpc" {
|
|
c.Http2ProtocolOptions = &envoycore.Http2ProtocolOptions{}
|
|
}
|
|
}
|
|
|
|
// Enable TLS upstream with the configured client certificate.
|
|
c.TlsContext = &envoyauth.UpstreamTlsContext{
|
|
CommonTlsContext: makeCommonTLSContext(cfgSnap),
|
|
Sni: sni,
|
|
}
|
|
|
|
return c, nil
|
|
}
|
|
|
|
func (s *Server) makeUpstreamClustersForDiscoveryChain(
|
|
upstream structs.Upstream,
|
|
chain *structs.CompiledDiscoveryChain,
|
|
cfgSnap *proxycfg.ConfigSnapshot,
|
|
) ([]*envoy.Cluster, error) {
|
|
cfg, err := ParseUpstreamConfigNoDefaults(upstream.Config)
|
|
if err != nil {
|
|
// Don't hard fail on a config typo, just warn. The parse func returns
|
|
// default config if there is an error so it's safe to continue.
|
|
s.Logger.Printf("[WARN] envoy: failed to parse Upstream[%s].Config: %s",
|
|
upstream.Identifier(), err)
|
|
}
|
|
|
|
if chain == nil {
|
|
panic("chain must be provided")
|
|
}
|
|
|
|
id := upstream.Identifier()
|
|
chainEndpointMap, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id]
|
|
if !ok {
|
|
// this should not happen
|
|
return nil, fmt.Errorf("no endpoint map for upstream %q", id)
|
|
}
|
|
|
|
// TODO(rb): make escape hatches work with chains
|
|
|
|
var out []*envoy.Cluster
|
|
|
|
for _, node := range chain.Nodes {
|
|
if node.Type != structs.DiscoveryGraphNodeTypeResolver {
|
|
continue
|
|
}
|
|
failover := node.Resolver.Failover
|
|
targetID := node.Resolver.Target
|
|
|
|
target := chain.Targets[targetID]
|
|
|
|
// Determine if we have to generate the entire cluster differently.
|
|
failoverThroughMeshGateway := chain.WillFailoverThroughMeshGateway(node)
|
|
|
|
sni := TargetSNI(target, cfgSnap)
|
|
clusterName := CustomizeClusterName(sni, chain)
|
|
|
|
if failoverThroughMeshGateway {
|
|
actualTargetID := firstHealthyTarget(
|
|
chain.Targets,
|
|
chainEndpointMap,
|
|
targetID,
|
|
failover.Targets,
|
|
)
|
|
|
|
if actualTargetID != targetID {
|
|
actualTarget := chain.Targets[actualTargetID]
|
|
sni = TargetSNI(actualTarget, cfgSnap)
|
|
}
|
|
}
|
|
|
|
s.Logger.Printf("[DEBUG] xds.clusters - generating cluster for %s", clusterName)
|
|
c := &envoy.Cluster{
|
|
Name: clusterName,
|
|
AltStatName: clusterName,
|
|
ConnectTimeout: node.Resolver.ConnectTimeout,
|
|
ClusterDiscoveryType: &envoy.Cluster_Type{Type: envoy.Cluster_EDS},
|
|
CommonLbConfig: &envoy.Cluster_CommonLbConfig{
|
|
HealthyPanicThreshold: &envoytype.Percent{
|
|
Value: 0, // disable panic threshold
|
|
},
|
|
},
|
|
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
|
|
EdsConfig: &envoycore.ConfigSource{
|
|
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
|
|
Ads: &envoycore.AggregatedConfigSource{},
|
|
},
|
|
},
|
|
},
|
|
// Having an empty config enables outlier detection with default config.
|
|
OutlierDetection: &envoycluster.OutlierDetection{},
|
|
}
|
|
|
|
proto := cfg.Protocol
|
|
if proto == "" {
|
|
proto = chain.Protocol
|
|
}
|
|
|
|
if proto == "" {
|
|
proto = "tcp"
|
|
}
|
|
|
|
if proto == "http2" || proto == "grpc" {
|
|
c.Http2ProtocolOptions = &envoycore.Http2ProtocolOptions{}
|
|
}
|
|
|
|
// Enable TLS upstream with the configured client certificate.
|
|
c.TlsContext = &envoyauth.UpstreamTlsContext{
|
|
CommonTlsContext: makeCommonTLSContext(cfgSnap),
|
|
Sni: sni,
|
|
}
|
|
|
|
out = append(out, c)
|
|
}
|
|
|
|
return out, nil
|
|
}
|
|
|
|
// makeClusterFromUserConfig returns the listener config decoded from an
|
|
// arbitrary proto3 json format string or an error if it's invalid.
|
|
//
|
|
// For now we only support embedding in JSON strings because of the hcl parsing
|
|
// pain (see config.go comment above call to PatchSliceOfMaps). Until we
|
|
// refactor config parser a _lot_ user's opaque config that contains arrays will
|
|
// be mangled. We could actually fix that up in mapstructure which knows the
|
|
// type of the target so could resolve the slices to singletons unambiguously
|
|
// and it would work for us here... but we still have the problem that the
|
|
// config would render incorrectly in general in our HTTP API responses so we
|
|
// really need to fix it "properly".
|
|
//
|
|
// When we do that we can support just nesting the config directly into the
|
|
// JSON/hcl naturally but this is a stop-gap that gets us an escape hatch
|
|
// immediately. It's also probably not a bad thing to support long-term since
|
|
// any config generated by other systems will likely be in canonical protobuf
|
|
// from rather than our slight variant in JSON/hcl.
|
|
func makeClusterFromUserConfig(configJSON string) (*envoy.Cluster, error) {
|
|
var jsonFields map[string]*json.RawMessage
|
|
if err := json.Unmarshal([]byte(configJSON), &jsonFields); err != nil {
|
|
fmt.Println("Custom error", err, configJSON)
|
|
return nil, err
|
|
}
|
|
|
|
var c envoy.Cluster
|
|
|
|
if _, ok := jsonFields["@type"]; ok {
|
|
// Type field is present so decode it as a types.Any
|
|
var any types.Any
|
|
err := jsonpb.UnmarshalString(configJSON, &any)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
// And then unmarshal the listener again...
|
|
err = proto.Unmarshal(any.Value, &c)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &c, err
|
|
}
|
|
|
|
// No @type so try decoding as a straight listener.
|
|
err := jsonpb.UnmarshalString(configJSON, &c)
|
|
return &c, err
|
|
}
|
|
|
|
func (s *Server) makeMeshGatewayCluster(clusterName string, cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
|
|
cfg, err := ParseMeshGatewayConfig(cfgSnap.Proxy.Config)
|
|
if err != nil {
|
|
// Don't hard fail on a config typo, just warn. The parse func returns
|
|
// default config if there is an error so it's safe to continue.
|
|
s.Logger.Printf("[WARN] envoy: failed to parse mesh gateway config: %s", err)
|
|
}
|
|
|
|
return &envoy.Cluster{
|
|
Name: clusterName,
|
|
ConnectTimeout: time.Duration(cfg.ConnectTimeoutMs) * time.Millisecond,
|
|
ClusterDiscoveryType: &envoy.Cluster_Type{Type: envoy.Cluster_EDS},
|
|
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
|
|
EdsConfig: &envoycore.ConfigSource{
|
|
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
|
|
Ads: &envoycore.AggregatedConfigSource{},
|
|
},
|
|
},
|
|
},
|
|
// Having an empty config enables outlier detection with default config.
|
|
OutlierDetection: &envoycluster.OutlierDetection{},
|
|
}, nil
|
|
}
|