mirror of https://github.com/status-im/consul.git
Connect: Fix Envoy getting stuck during load (#5499)
* Connect: Fix Envoy getting stuck during load Also in this PR: - Enabled outlier detection on upstreams which will mark instances unhealthy after 5 failures (using Envoy's defaults) - Enable weighted load balancing where DNS weights are configured * Fix empty load assignments in the right place * Fix import names from review * Move millisecond parse to a helper function
This commit is contained in:
parent
6a78e2ae55
commit
89fa5ec3ba
|
@ -4,10 +4,12 @@ import (
|
|||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
|
||||
envoycluster "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
|
||||
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
||||
"github.com/gogo/protobuf/jsonpb"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
|
@ -86,6 +88,25 @@ func makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
|
|||
return c, err
|
||||
}
|
||||
|
||||
func parseTimeMillis(ms interface{}) (time.Duration, error) {
|
||||
switch v := ms.(type) {
|
||||
case string:
|
||||
ms, err := strconv.Atoi(v)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return time.Duration(ms) * time.Millisecond, nil
|
||||
|
||||
case float64: // This is what parsing from JSON results in
|
||||
return time.Duration(v) * time.Millisecond, nil
|
||||
// Not sure if this can ever really happen but just in case it does in
|
||||
// some test code...
|
||||
case int:
|
||||
return time.Duration(v) * time.Millisecond, nil
|
||||
}
|
||||
return 0, errors.New("invalid type for millisecond duration")
|
||||
}
|
||||
|
||||
func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) {
|
||||
var c *envoy.Cluster
|
||||
var err error
|
||||
|
@ -101,9 +122,15 @@ func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnap
|
|||
}
|
||||
|
||||
if c == nil {
|
||||
conTimeout := 5 * time.Second
|
||||
if toRaw, ok := upstream.Config["connect_timeout_ms"]; ok {
|
||||
if ms, err := parseTimeMillis(toRaw); err == nil {
|
||||
conTimeout = ms
|
||||
}
|
||||
}
|
||||
c = &envoy.Cluster{
|
||||
Name: upstream.Identifier(),
|
||||
ConnectTimeout: 5 * time.Second,
|
||||
ConnectTimeout: conTimeout,
|
||||
Type: envoy.Cluster_EDS,
|
||||
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
|
||||
EdsConfig: &envoycore.ConfigSource{
|
||||
|
@ -112,6 +139,8 @@ func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnap
|
|||
},
|
||||
},
|
||||
},
|
||||
// Having an empty config enables outlier detection with default config.
|
||||
OutlierDetection: &envoycluster.OutlierDetection{},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
package xds
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||
envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
|
||||
"github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
|
||||
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func Test_makeUpstreamCluster(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
snap proxycfg.ConfigSnapshot
|
||||
upstream structs.Upstream
|
||||
want *envoy.Cluster
|
||||
}{
|
||||
{
|
||||
name: "timeout override",
|
||||
snap: proxycfg.ConfigSnapshot{},
|
||||
upstream: structs.TestUpstreams(t)[0],
|
||||
want: &envoy.Cluster{
|
||||
Name: "service:db",
|
||||
Type: envoy.Cluster_EDS,
|
||||
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
|
||||
EdsConfig: &envoycore.ConfigSource{
|
||||
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
|
||||
Ads: &envoycore.AggregatedConfigSource{},
|
||||
},
|
||||
},
|
||||
},
|
||||
ConnectTimeout: 1 * time.Second, // TestUpstreams overrides to 1000ms
|
||||
OutlierDetection: &cluster.OutlierDetection{},
|
||||
TlsContext: &envoyauth.UpstreamTlsContext{
|
||||
CommonTlsContext: makeCommonTLSContext(&proxycfg.ConfigSnapshot{}),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "timeout default",
|
||||
snap: proxycfg.ConfigSnapshot{},
|
||||
upstream: structs.TestUpstreams(t)[1],
|
||||
want: &envoy.Cluster{
|
||||
Name: "prepared_query:geo-cache",
|
||||
Type: envoy.Cluster_EDS,
|
||||
EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{
|
||||
EdsConfig: &envoycore.ConfigSource{
|
||||
ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{
|
||||
Ads: &envoycore.AggregatedConfigSource{},
|
||||
},
|
||||
},
|
||||
},
|
||||
ConnectTimeout: 5 * time.Second, // Default
|
||||
OutlierDetection: &cluster.OutlierDetection{},
|
||||
TlsContext: &envoyauth.UpstreamTlsContext{
|
||||
CommonTlsContext: makeCommonTLSContext(&proxycfg.ConfigSnapshot{}),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
require := require.New(t)
|
||||
got, err := makeUpstreamCluster(tt.upstream, &tt.snap)
|
||||
require.NoError(err)
|
||||
|
||||
require.Equal(tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -4,11 +4,13 @@ import (
|
|||
"errors"
|
||||
|
||||
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
||||
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
|
||||
"github.com/hashicorp/consul/agent/proxycfg"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
)
|
||||
|
||||
// endpointsFromSnapshot returns the xDS API representation of the "endpoints"
|
||||
|
@ -19,9 +21,6 @@ func endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]pr
|
|||
}
|
||||
resources := make([]proto.Message, 0, len(cfgSnap.UpstreamEndpoints))
|
||||
for id, endpoints := range cfgSnap.UpstreamEndpoints {
|
||||
if len(endpoints) < 1 {
|
||||
continue
|
||||
}
|
||||
la := makeLoadAssignment(id, endpoints)
|
||||
resources = append(resources, la)
|
||||
}
|
||||
|
@ -43,10 +42,38 @@ func makeLoadAssignment(clusterName string, endpoints structs.CheckServiceNodes)
|
|||
if addr == "" {
|
||||
addr = ep.Node.Address
|
||||
}
|
||||
healthStatus := envoycore.HealthStatus_HEALTHY
|
||||
weight := 1
|
||||
if ep.Service.Weights != nil {
|
||||
weight = ep.Service.Weights.Passing
|
||||
}
|
||||
|
||||
for _, chk := range ep.Checks {
|
||||
if chk.Status == api.HealthCritical {
|
||||
// This can't actually happen now because health always filters critical
|
||||
// but in the future it may not so set this correctly!
|
||||
healthStatus = envoycore.HealthStatus_UNHEALTHY
|
||||
}
|
||||
if chk.Status == api.HealthWarning && ep.Service.Weights != nil {
|
||||
weight = ep.Service.Weights.Warning
|
||||
}
|
||||
}
|
||||
// Make weights fit Envoy's limits. A zero weight means that either Warning
|
||||
// (likely) or Passing (weirdly) weight has been set to 0 effectively making
|
||||
// this instance unhealthy and should not be sent traffic.
|
||||
if weight < 1 {
|
||||
healthStatus = envoycore.HealthStatus_UNHEALTHY
|
||||
weight = 1
|
||||
}
|
||||
if weight > 128 {
|
||||
weight = 128
|
||||
}
|
||||
es = append(es, envoyendpoint.LbEndpoint{
|
||||
Endpoint: &envoyendpoint.Endpoint{
|
||||
Address: makeAddressPtr(addr, ep.Service.Port),
|
||||
},
|
||||
HealthStatus: healthStatus,
|
||||
LoadBalancingWeight: makeUint32Value(weight),
|
||||
})
|
||||
}
|
||||
return &envoy.ClusterLoadAssignment{
|
||||
|
|
|
@ -0,0 +1,193 @@
|
|||
package xds
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/mitchellh/copystructure"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2"
|
||||
"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
||||
envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
func Test_makeLoadAssignment(t *testing.T) {
|
||||
|
||||
testCheckServiceNodes := structs.CheckServiceNodes{
|
||||
structs.CheckServiceNode{
|
||||
Node: &structs.Node{
|
||||
ID: "node1-id",
|
||||
Node: "node1",
|
||||
Address: "10.10.10.10",
|
||||
Datacenter: "dc1",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Service: "web",
|
||||
Port: 1234,
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "node1",
|
||||
CheckID: "serfHealth",
|
||||
Status: "passing",
|
||||
},
|
||||
&structs.HealthCheck{
|
||||
Node: "node1",
|
||||
ServiceID: "web",
|
||||
CheckID: "web:check",
|
||||
Status: "passing",
|
||||
},
|
||||
},
|
||||
},
|
||||
structs.CheckServiceNode{
|
||||
Node: &structs.Node{
|
||||
ID: "node2-id",
|
||||
Node: "node2",
|
||||
Address: "10.10.10.20",
|
||||
Datacenter: "dc1",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Service: "web",
|
||||
Port: 1234,
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
&structs.HealthCheck{
|
||||
Node: "node2",
|
||||
CheckID: "serfHealth",
|
||||
Status: "passing",
|
||||
},
|
||||
&structs.HealthCheck{
|
||||
Node: "node2",
|
||||
ServiceID: "web",
|
||||
CheckID: "web:check",
|
||||
Status: "passing",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
testWeightedCheckServiceNodesRaw, err := copystructure.Copy(testCheckServiceNodes)
|
||||
require.NoError(t, err)
|
||||
testWeightedCheckServiceNodes := testWeightedCheckServiceNodesRaw.(structs.CheckServiceNodes)
|
||||
|
||||
testWeightedCheckServiceNodes[0].Service.Weights = &structs.Weights{
|
||||
Passing: 10,
|
||||
Warning: 1,
|
||||
}
|
||||
testWeightedCheckServiceNodes[1].Service.Weights = &structs.Weights{
|
||||
Passing: 5,
|
||||
Warning: 0,
|
||||
}
|
||||
|
||||
testWarningCheckServiceNodesRaw, err := copystructure.Copy(testWeightedCheckServiceNodes)
|
||||
require.NoError(t, err)
|
||||
testWarningCheckServiceNodes := testWarningCheckServiceNodesRaw.(structs.CheckServiceNodes)
|
||||
|
||||
testWarningCheckServiceNodes[0].Checks[0].Status = "warning"
|
||||
testWarningCheckServiceNodes[1].Checks[0].Status = "warning"
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
clusterName string
|
||||
endpoints structs.CheckServiceNodes
|
||||
want *envoy.ClusterLoadAssignment
|
||||
}{
|
||||
{
|
||||
name: "no instances",
|
||||
clusterName: "service:test",
|
||||
endpoints: structs.CheckServiceNodes{},
|
||||
want: &envoy.ClusterLoadAssignment{
|
||||
ClusterName: "service:test",
|
||||
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
|
||||
LbEndpoints: []envoyendpoint.LbEndpoint{},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "instances, no weights",
|
||||
clusterName: "service:test",
|
||||
endpoints: testCheckServiceNodes,
|
||||
want: &envoy.ClusterLoadAssignment{
|
||||
ClusterName: "service:test",
|
||||
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
|
||||
LbEndpoints: []envoyendpoint.LbEndpoint{
|
||||
envoyendpoint.LbEndpoint{
|
||||
Endpoint: &envoyendpoint.Endpoint{
|
||||
Address: makeAddressPtr("10.10.10.10", 1234),
|
||||
},
|
||||
HealthStatus: core.HealthStatus_HEALTHY,
|
||||
LoadBalancingWeight: makeUint32Value(1),
|
||||
},
|
||||
envoyendpoint.LbEndpoint{
|
||||
Endpoint: &envoyendpoint.Endpoint{
|
||||
Address: makeAddressPtr("10.10.10.20", 1234),
|
||||
},
|
||||
HealthStatus: core.HealthStatus_HEALTHY,
|
||||
LoadBalancingWeight: makeUint32Value(1),
|
||||
},
|
||||
},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "instances, healthy weights",
|
||||
clusterName: "service:test",
|
||||
endpoints: testWeightedCheckServiceNodes,
|
||||
want: &envoy.ClusterLoadAssignment{
|
||||
ClusterName: "service:test",
|
||||
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
|
||||
LbEndpoints: []envoyendpoint.LbEndpoint{
|
||||
envoyendpoint.LbEndpoint{
|
||||
Endpoint: &envoyendpoint.Endpoint{
|
||||
Address: makeAddressPtr("10.10.10.10", 1234),
|
||||
},
|
||||
HealthStatus: core.HealthStatus_HEALTHY,
|
||||
LoadBalancingWeight: makeUint32Value(10),
|
||||
},
|
||||
envoyendpoint.LbEndpoint{
|
||||
Endpoint: &envoyendpoint.Endpoint{
|
||||
Address: makeAddressPtr("10.10.10.20", 1234),
|
||||
},
|
||||
HealthStatus: core.HealthStatus_HEALTHY,
|
||||
LoadBalancingWeight: makeUint32Value(5),
|
||||
},
|
||||
},
|
||||
}},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "instances, warning weights",
|
||||
clusterName: "service:test",
|
||||
endpoints: testWarningCheckServiceNodes,
|
||||
want: &envoy.ClusterLoadAssignment{
|
||||
ClusterName: "service:test",
|
||||
Endpoints: []envoyendpoint.LocalityLbEndpoints{{
|
||||
LbEndpoints: []envoyendpoint.LbEndpoint{
|
||||
envoyendpoint.LbEndpoint{
|
||||
Endpoint: &envoyendpoint.Endpoint{
|
||||
Address: makeAddressPtr("10.10.10.10", 1234),
|
||||
},
|
||||
HealthStatus: core.HealthStatus_HEALTHY,
|
||||
LoadBalancingWeight: makeUint32Value(1),
|
||||
},
|
||||
envoyendpoint.LbEndpoint{
|
||||
Endpoint: &envoyendpoint.Endpoint{
|
||||
Address: makeAddressPtr("10.10.10.20", 1234),
|
||||
},
|
||||
HealthStatus: core.HealthStatus_UNHEALTHY,
|
||||
LoadBalancingWeight: makeUint32Value(1),
|
||||
},
|
||||
},
|
||||
}},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got := makeLoadAssignment(tt.clusterName, tt.endpoints)
|
||||
require.Equal(t, tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -252,6 +252,9 @@ func makeCommonTLSContext(cfgSnap *proxycfg.ConfigSnapshot) *envoyauth.CommonTls
|
|||
// Concatenate all the root PEMs into one.
|
||||
// TODO(banks): verify this actually works with Envoy (docs are not clear).
|
||||
rootPEMS := ""
|
||||
if cfgSnap.Roots == nil {
|
||||
return nil
|
||||
}
|
||||
for _, root := range cfgSnap.Roots.Roots {
|
||||
rootPEMS += root.RootCert
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
||||
"github.com/gogo/protobuf/proto"
|
||||
"github.com/gogo/protobuf/types"
|
||||
prototypes "github.com/gogo/protobuf/types"
|
||||
)
|
||||
|
||||
func createResponse(typeURL string, version, nonce string, resources []proto.Message) (*envoy.DiscoveryResponse, error) {
|
||||
|
@ -52,3 +53,7 @@ func makeAddressPtr(ip string, port int) *envoycore.Address {
|
|||
a := makeAddress(ip, port)
|
||||
return &a
|
||||
}
|
||||
|
||||
func makeUint32Value(n int) *prototypes.UInt32Value {
|
||||
return &prototypes.UInt32Value{Value: uint32(n)}
|
||||
}
|
||||
|
|
|
@ -346,7 +346,13 @@ func (t *xDSType) SendIfNew(cfgSnap *proxycfg.ConfigSnapshot, version uint64, no
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if resources == nil || len(resources) == 0 {
|
||||
// Zero length resource responses should be ignored and are the result of no
|
||||
// data yet. Notice that this caused a bug originally where we had zero
|
||||
// healthy endpoints for an upstream that would cause Envoy to hang waiting
|
||||
// for the EDS response. This is fixed though by ensuring we send an explicit
|
||||
// empty LoadAssignment resource for the cluster rather than allowing junky
|
||||
// empty resources.
|
||||
if len(resources) == 0 {
|
||||
// Nothing to send yet
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -389,7 +389,10 @@ func expectClustersJSONResources(t *testing.T, snap *proxycfg.ConfigSnapshot, to
|
|||
}
|
||||
}
|
||||
},
|
||||
"connectTimeout": "5s",
|
||||
"outlierDetection": {
|
||||
|
||||
},
|
||||
"connectTimeout": "1s",
|
||||
"tlsContext": ` + expectedUpstreamTLSContextJSON(t, snap) + `
|
||||
}`,
|
||||
"prepared_query:geo-cache": `
|
||||
|
@ -403,6 +406,9 @@ func expectClustersJSONResources(t *testing.T, snap *proxycfg.ConfigSnapshot, to
|
|||
|
||||
}
|
||||
}
|
||||
},
|
||||
"outlierDetection": {
|
||||
|
||||
},
|
||||
"connectTimeout": "5s",
|
||||
"tlsContext": ` + expectedUpstreamTLSContextJSON(t, snap) + `
|
||||
|
@ -461,7 +467,9 @@ func expectEndpointsJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token stri
|
|||
"portValue": 0
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
},
|
||||
{
|
||||
"endpoint": {
|
||||
|
@ -471,7 +479,9 @@ func expectEndpointsJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token stri
|
|||
"portValue": 0
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"healthStatus": "HEALTHY",
|
||||
"loadBalancingWeight": 1
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue