connect: fix endpoints clusterName when using cluster escape hatch (#7319)

```changelog
* fix(connect): fix endpoints clusterName when using cluster escape hatch
```
This commit is contained in:
Raphaël Rondeau 2020-05-26 10:57:22 +02:00 committed by R.B. Boyer
parent 0d86e802be
commit b29c954480
3 changed files with 83 additions and 0 deletions

View File

@ -79,6 +79,7 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
} else {
// Newfangled discovery chain plumbing.
es := s.endpointsFromDiscoveryChain(
u,
chain,
cfgSnap.Datacenter,
cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id],
@ -268,6 +269,7 @@ func (s *Server) endpointsFromSnapshotIngressGateway(cfgSnap *proxycfg.ConfigSna
}
es := s.endpointsFromDiscoveryChain(
u,
cfgSnap.IngressGateway.DiscoveryChain[id],
cfgSnap.Datacenter,
cfgSnap.IngressGateway.WatchedUpstreamEndpoints[id],
@ -291,6 +293,7 @@ func makeEndpoint(clusterName, host string, port int) envoyendpoint.LbEndpoint {
}
func (s *Server) endpointsFromDiscoveryChain(
upstream structs.Upstream,
chain *structs.CompiledDiscoveryChain,
datacenter string,
upstreamEndpoints, gatewayEndpoints map[string]structs.CheckServiceNodes,
@ -301,6 +304,30 @@ func (s *Server) endpointsFromDiscoveryChain(
return resources
}
cfg, err := ParseUpstreamConfigNoDefaults(upstream.Config)
if err != nil {
// Don't hard fail on a config typo, just warn. The parse func returns
// default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse", "upstream", upstream.Identifier(),
"error", err)
}
var escapeHatchCluster *envoy.Cluster
if cfg.ClusterJSON != "" {
if chain.IsDefault() {
// If you haven't done anything to setup the discovery chain, then
// you can use the envoy_cluster_json escape hatch.
escapeHatchCluster, err = makeClusterFromUserConfig(cfg.ClusterJSON)
if err != nil {
return resources
}
} else {
s.Logger.Warn("ignoring escape hatch setting, because a discovery chain is configued for",
"discovery chain", chain.ServiceName, "upstream", upstream.Identifier(),
"envoy_cluster_json", chain.ServiceName)
}
}
// Find all resolver nodes.
for _, node := range chain.Nodes {
if node.Type != structs.DiscoveryGraphNodeTypeResolver {
@ -312,6 +339,10 @@ func (s *Server) endpointsFromDiscoveryChain(
target := chain.Targets[targetID]
clusterName := CustomizeClusterName(target.Name, chain)
if escapeHatchCluster != nil {
clusterName = escapeHatchCluster.Name
}
s.Logger.Debug("generating endpoints for", "cluster", clusterName)
// Determine if we have to generate the entire cluster differently.
failoverThroughMeshGateway := chain.WillFailoverThroughMeshGateway(node)

View File

@ -306,6 +306,17 @@ func Test_endpointsFromSnapshot(t *testing.T) {
create: proxycfg.TestConfigSnapshotDiscoveryChainWithDoubleFailoverThroughLocalGatewayTriggered,
setup: nil,
},
{
name: "connect-proxy-with-default-chain-and-custom-cluster",
create: proxycfg.TestConfigSnapshotDiscoveryChainDefault,
setup: func(snap *proxycfg.ConfigSnapshot) {
snap.Proxy.Upstreams[0].Config["envoy_cluster_json"] =
customAppClusterJSON(t, customClusterJSONOptions{
Name: "myservice",
IncludeType: false,
})
},
},
{
name: "splitter-with-resolver-redirect",
create: proxycfg.TestConfigSnapshotDiscoveryChain_SplitterWithResolverRedirectMultiDC,

View File

@ -0,0 +1,41 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment",
"clusterName": "myservice",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.10.1.1",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.10.1.2",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
]
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment",
"nonce": "00000001"
}