diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index 5908632475..9490ea960b 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -4,10 +4,12 @@ import ( "encoding/json" "errors" "fmt" + "strconv" "time" envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2" envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth" + envoycluster "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster" envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" "github.com/gogo/protobuf/jsonpb" "github.com/gogo/protobuf/proto" @@ -86,6 +88,25 @@ func makeAppCluster(cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) { return c, err } +func parseTimeMillis(ms interface{}) (time.Duration, error) { + switch v := ms.(type) { + case string: + ms, err := strconv.Atoi(v) + if err != nil { + return 0, err + } + return time.Duration(ms) * time.Millisecond, nil + + case float64: // This is what parsing from JSON results in + return time.Duration(v) * time.Millisecond, nil + // Not sure if this can ever really happen but just in case it does in + // some test code... + case int: + return time.Duration(v) * time.Millisecond, nil + } + return 0, errors.New("invalid type for millisecond duration") +} + func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnapshot) (*envoy.Cluster, error) { var c *envoy.Cluster var err error @@ -101,9 +122,15 @@ func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnap } if c == nil { + conTimeout := 5 * time.Second + if toRaw, ok := upstream.Config["connect_timeout_ms"]; ok { + if ms, err := parseTimeMillis(toRaw); err == nil { + conTimeout = ms + } + } c = &envoy.Cluster{ Name: upstream.Identifier(), - ConnectTimeout: 5 * time.Second, + ConnectTimeout: conTimeout, Type: envoy.Cluster_EDS, EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{ EdsConfig: &envoycore.ConfigSource{ @@ -112,6 +139,8 @@ func makeUpstreamCluster(upstream structs.Upstream, cfgSnap *proxycfg.ConfigSnap }, }, }, + // Having an empty config enables outlier detection with default config. + OutlierDetection: &envoycluster.OutlierDetection{}, } } diff --git a/agent/xds/clusters_test.go b/agent/xds/clusters_test.go new file mode 100644 index 0000000000..0687fa2c18 --- /dev/null +++ b/agent/xds/clusters_test.go @@ -0,0 +1,76 @@ +package xds + +import ( + "testing" + "time" + + envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2" + envoyauth "github.com/envoyproxy/go-control-plane/envoy/api/v2/auth" + "github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster" + envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" +) + +func Test_makeUpstreamCluster(t *testing.T) { + tests := []struct { + name string + snap proxycfg.ConfigSnapshot + upstream structs.Upstream + want *envoy.Cluster + }{ + { + name: "timeout override", + snap: proxycfg.ConfigSnapshot{}, + upstream: structs.TestUpstreams(t)[0], + want: &envoy.Cluster{ + Name: "service:db", + Type: envoy.Cluster_EDS, + EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{ + EdsConfig: &envoycore.ConfigSource{ + ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{ + Ads: &envoycore.AggregatedConfigSource{}, + }, + }, + }, + ConnectTimeout: 1 * time.Second, // TestUpstreams overrides to 1000ms + OutlierDetection: &cluster.OutlierDetection{}, + TlsContext: &envoyauth.UpstreamTlsContext{ + CommonTlsContext: makeCommonTLSContext(&proxycfg.ConfigSnapshot{}), + }, + }, + }, + { + name: "timeout default", + snap: proxycfg.ConfigSnapshot{}, + upstream: structs.TestUpstreams(t)[1], + want: &envoy.Cluster{ + Name: "prepared_query:geo-cache", + Type: envoy.Cluster_EDS, + EdsClusterConfig: &envoy.Cluster_EdsClusterConfig{ + EdsConfig: &envoycore.ConfigSource{ + ConfigSourceSpecifier: &envoycore.ConfigSource_Ads{ + Ads: &envoycore.AggregatedConfigSource{}, + }, + }, + }, + ConnectTimeout: 5 * time.Second, // Default + OutlierDetection: &cluster.OutlierDetection{}, + TlsContext: &envoyauth.UpstreamTlsContext{ + CommonTlsContext: makeCommonTLSContext(&proxycfg.ConfigSnapshot{}), + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + got, err := makeUpstreamCluster(tt.upstream, &tt.snap) + require.NoError(err) + + require.Equal(tt.want, got) + }) + } +} diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index 59d9e23639..5ac4bd24e3 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -4,11 +4,13 @@ import ( "errors" envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2" + envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" "github.com/gogo/protobuf/proto" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" ) // endpointsFromSnapshot returns the xDS API representation of the "endpoints" @@ -19,9 +21,6 @@ func endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapshot, token string) ([]pr } resources := make([]proto.Message, 0, len(cfgSnap.UpstreamEndpoints)) for id, endpoints := range cfgSnap.UpstreamEndpoints { - if len(endpoints) < 1 { - continue - } la := makeLoadAssignment(id, endpoints) resources = append(resources, la) } @@ -43,10 +42,38 @@ func makeLoadAssignment(clusterName string, endpoints structs.CheckServiceNodes) if addr == "" { addr = ep.Node.Address } + healthStatus := envoycore.HealthStatus_HEALTHY + weight := 1 + if ep.Service.Weights != nil { + weight = ep.Service.Weights.Passing + } + + for _, chk := range ep.Checks { + if chk.Status == api.HealthCritical { + // This can't actually happen now because health always filters critical + // but in the future it may not so set this correctly! + healthStatus = envoycore.HealthStatus_UNHEALTHY + } + if chk.Status == api.HealthWarning && ep.Service.Weights != nil { + weight = ep.Service.Weights.Warning + } + } + // Make weights fit Envoy's limits. A zero weight means that either Warning + // (likely) or Passing (weirdly) weight has been set to 0 effectively making + // this instance unhealthy and should not be sent traffic. + if weight < 1 { + healthStatus = envoycore.HealthStatus_UNHEALTHY + weight = 1 + } + if weight > 128 { + weight = 128 + } es = append(es, envoyendpoint.LbEndpoint{ Endpoint: &envoyendpoint.Endpoint{ Address: makeAddressPtr(addr, ep.Service.Port), }, + HealthStatus: healthStatus, + LoadBalancingWeight: makeUint32Value(weight), }) } return &envoy.ClusterLoadAssignment{ diff --git a/agent/xds/endpoints_test.go b/agent/xds/endpoints_test.go new file mode 100644 index 0000000000..638e2ebc07 --- /dev/null +++ b/agent/xds/endpoints_test.go @@ -0,0 +1,193 @@ +package xds + +import ( + "testing" + + "github.com/mitchellh/copystructure" + + "github.com/stretchr/testify/require" + + envoy "github.com/envoyproxy/go-control-plane/envoy/api/v2" + "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + envoyendpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + "github.com/hashicorp/consul/agent/structs" +) + +func Test_makeLoadAssignment(t *testing.T) { + + testCheckServiceNodes := structs.CheckServiceNodes{ + structs.CheckServiceNode{ + Node: &structs.Node{ + ID: "node1-id", + Node: "node1", + Address: "10.10.10.10", + Datacenter: "dc1", + }, + Service: &structs.NodeService{ + Service: "web", + Port: 1234, + }, + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "node1", + CheckID: "serfHealth", + Status: "passing", + }, + &structs.HealthCheck{ + Node: "node1", + ServiceID: "web", + CheckID: "web:check", + Status: "passing", + }, + }, + }, + structs.CheckServiceNode{ + Node: &structs.Node{ + ID: "node2-id", + Node: "node2", + Address: "10.10.10.20", + Datacenter: "dc1", + }, + Service: &structs.NodeService{ + Service: "web", + Port: 1234, + }, + Checks: structs.HealthChecks{ + &structs.HealthCheck{ + Node: "node2", + CheckID: "serfHealth", + Status: "passing", + }, + &structs.HealthCheck{ + Node: "node2", + ServiceID: "web", + CheckID: "web:check", + Status: "passing", + }, + }, + }, + } + + testWeightedCheckServiceNodesRaw, err := copystructure.Copy(testCheckServiceNodes) + require.NoError(t, err) + testWeightedCheckServiceNodes := testWeightedCheckServiceNodesRaw.(structs.CheckServiceNodes) + + testWeightedCheckServiceNodes[0].Service.Weights = &structs.Weights{ + Passing: 10, + Warning: 1, + } + testWeightedCheckServiceNodes[1].Service.Weights = &structs.Weights{ + Passing: 5, + Warning: 0, + } + + testWarningCheckServiceNodesRaw, err := copystructure.Copy(testWeightedCheckServiceNodes) + require.NoError(t, err) + testWarningCheckServiceNodes := testWarningCheckServiceNodesRaw.(structs.CheckServiceNodes) + + testWarningCheckServiceNodes[0].Checks[0].Status = "warning" + testWarningCheckServiceNodes[1].Checks[0].Status = "warning" + + tests := []struct { + name string + clusterName string + endpoints structs.CheckServiceNodes + want *envoy.ClusterLoadAssignment + }{ + { + name: "no instances", + clusterName: "service:test", + endpoints: structs.CheckServiceNodes{}, + want: &envoy.ClusterLoadAssignment{ + ClusterName: "service:test", + Endpoints: []envoyendpoint.LocalityLbEndpoints{{ + LbEndpoints: []envoyendpoint.LbEndpoint{}, + }}, + }, + }, + { + name: "instances, no weights", + clusterName: "service:test", + endpoints: testCheckServiceNodes, + want: &envoy.ClusterLoadAssignment{ + ClusterName: "service:test", + Endpoints: []envoyendpoint.LocalityLbEndpoints{{ + LbEndpoints: []envoyendpoint.LbEndpoint{ + envoyendpoint.LbEndpoint{ + Endpoint: &envoyendpoint.Endpoint{ + Address: makeAddressPtr("10.10.10.10", 1234), + }, + HealthStatus: core.HealthStatus_HEALTHY, + LoadBalancingWeight: makeUint32Value(1), + }, + envoyendpoint.LbEndpoint{ + Endpoint: &envoyendpoint.Endpoint{ + Address: makeAddressPtr("10.10.10.20", 1234), + }, + HealthStatus: core.HealthStatus_HEALTHY, + LoadBalancingWeight: makeUint32Value(1), + }, + }, + }}, + }, + }, + { + name: "instances, healthy weights", + clusterName: "service:test", + endpoints: testWeightedCheckServiceNodes, + want: &envoy.ClusterLoadAssignment{ + ClusterName: "service:test", + Endpoints: []envoyendpoint.LocalityLbEndpoints{{ + LbEndpoints: []envoyendpoint.LbEndpoint{ + envoyendpoint.LbEndpoint{ + Endpoint: &envoyendpoint.Endpoint{ + Address: makeAddressPtr("10.10.10.10", 1234), + }, + HealthStatus: core.HealthStatus_HEALTHY, + LoadBalancingWeight: makeUint32Value(10), + }, + envoyendpoint.LbEndpoint{ + Endpoint: &envoyendpoint.Endpoint{ + Address: makeAddressPtr("10.10.10.20", 1234), + }, + HealthStatus: core.HealthStatus_HEALTHY, + LoadBalancingWeight: makeUint32Value(5), + }, + }, + }}, + }, + }, + { + name: "instances, warning weights", + clusterName: "service:test", + endpoints: testWarningCheckServiceNodes, + want: &envoy.ClusterLoadAssignment{ + ClusterName: "service:test", + Endpoints: []envoyendpoint.LocalityLbEndpoints{{ + LbEndpoints: []envoyendpoint.LbEndpoint{ + envoyendpoint.LbEndpoint{ + Endpoint: &envoyendpoint.Endpoint{ + Address: makeAddressPtr("10.10.10.10", 1234), + }, + HealthStatus: core.HealthStatus_HEALTHY, + LoadBalancingWeight: makeUint32Value(1), + }, + envoyendpoint.LbEndpoint{ + Endpoint: &envoyendpoint.Endpoint{ + Address: makeAddressPtr("10.10.10.20", 1234), + }, + HealthStatus: core.HealthStatus_UNHEALTHY, + LoadBalancingWeight: makeUint32Value(1), + }, + }, + }}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := makeLoadAssignment(tt.clusterName, tt.endpoints) + require.Equal(t, tt.want, got) + }) + } +} diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index aba74c8974..bbd9099470 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -252,6 +252,9 @@ func makeCommonTLSContext(cfgSnap *proxycfg.ConfigSnapshot) *envoyauth.CommonTls // Concatenate all the root PEMs into one. // TODO(banks): verify this actually works with Envoy (docs are not clear). rootPEMS := "" + if cfgSnap.Roots == nil { + return nil + } for _, root := range cfgSnap.Roots.Roots { rootPEMS += root.RootCert } diff --git a/agent/xds/response.go b/agent/xds/response.go index a09c31a3d7..b4131ece41 100644 --- a/agent/xds/response.go +++ b/agent/xds/response.go @@ -5,6 +5,7 @@ import ( envoycore "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" "github.com/gogo/protobuf/proto" "github.com/gogo/protobuf/types" + prototypes "github.com/gogo/protobuf/types" ) func createResponse(typeURL string, version, nonce string, resources []proto.Message) (*envoy.DiscoveryResponse, error) { @@ -52,3 +53,7 @@ func makeAddressPtr(ip string, port int) *envoycore.Address { a := makeAddress(ip, port) return &a } + +func makeUint32Value(n int) *prototypes.UInt32Value { + return &prototypes.UInt32Value{Value: uint32(n)} +} diff --git a/agent/xds/server.go b/agent/xds/server.go index 5b28f79925..4172e19a62 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -346,7 +346,13 @@ func (t *xDSType) SendIfNew(cfgSnap *proxycfg.ConfigSnapshot, version uint64, no if err != nil { return err } - if resources == nil || len(resources) == 0 { + // Zero length resource responses should be ignored and are the result of no + // data yet. Notice that this caused a bug originally where we had zero + // healthy endpoints for an upstream that would cause Envoy to hang waiting + // for the EDS response. This is fixed though by ensuring we send an explicit + // empty LoadAssignment resource for the cluster rather than allowing junky + // empty resources. + if len(resources) == 0 { // Nothing to send yet return nil } diff --git a/agent/xds/server_test.go b/agent/xds/server_test.go index 89a1854e0e..6a36e001dd 100644 --- a/agent/xds/server_test.go +++ b/agent/xds/server_test.go @@ -389,7 +389,10 @@ func expectClustersJSONResources(t *testing.T, snap *proxycfg.ConfigSnapshot, to } } }, - "connectTimeout": "5s", + "outlierDetection": { + + }, + "connectTimeout": "1s", "tlsContext": ` + expectedUpstreamTLSContextJSON(t, snap) + ` }`, "prepared_query:geo-cache": ` @@ -403,6 +406,9 @@ func expectClustersJSONResources(t *testing.T, snap *proxycfg.ConfigSnapshot, to } } + }, + "outlierDetection": { + }, "connectTimeout": "5s", "tlsContext": ` + expectedUpstreamTLSContextJSON(t, snap) + ` @@ -461,7 +467,9 @@ func expectEndpointsJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token stri "portValue": 0 } } - } + }, + "healthStatus": "HEALTHY", + "loadBalancingWeight": 1 }, { "endpoint": { @@ -471,7 +479,9 @@ func expectEndpointsJSON(t *testing.T, snap *proxycfg.ConfigSnapshot, token stri "portValue": 0 } } - } + }, + "healthStatus": "HEALTHY", + "loadBalancingWeight": 1 } ] }