From e039dfd7f8a41851c8dd5bd8336f71ae8950c163 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" Date: Tue, 23 Jul 2019 20:20:24 -0500 Subject: [PATCH] connect: rework how the service resolver subset OnlyPassing flag works (#6173) The main change is that we no longer filter service instances by health, preferring instead to render all results down into EDS endpoints in envoy and merely label the endpoints as HEALTHY or UNHEALTHY. When OnlyPassing is set to true we will force consul checks in a 'warning' state to render as UNHEALTHY in envoy. Fixes #6171 --- agent/proxycfg/state.go | 24 +----- agent/structs/discovery_chain.go | 18 +++- agent/xds/endpoints.go | 84 ++++++++++--------- agent/xds/endpoints_test.go | 26 +++--- .../mesh-gateway-service-subsets.golden | 12 +++ .../config_entries.hcl | 25 ++++++ .../s2-v1.hcl | 22 +++++ .../setup.sh | 25 ++++++ .../verify.bats | 66 +++++++++++++++ test/integration/connect/envoy/helpers.bash | 34 ++++++++ .../config-entries/service-resolver.html.md | 8 +- 11 files changed, 268 insertions(+), 76 deletions(-) create mode 100644 test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/config_entries.hcl create mode 100644 test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/s2-v1.hcl create mode 100644 test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/setup.sh create mode 100644 test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/verify.bats diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 7d42fb44dc..233d9e2eb2 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -502,8 +502,6 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh return fmt.Errorf("invalid correlation id %q: %v", u.CorrelationID, err) } - // TODO(rb): do we have to do onlypassing filters here? - m, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[svc] if !ok { m = make(map[structs.DiscoveryTarget]structs.CheckServiceNodes) @@ -608,16 +606,8 @@ func (s *state) resetWatchesFromChain( } s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: initializing watch of target %s", id, chain.ServiceName, target) - // snap.WatchedUpstreams[name] - - // delete(snap.WatchedUpstreams[name], target) - // delete(snap.WatchedUpstreamEndpoint[name], target) - - // TODO(rb): augment the health rpc so we can get the health information to pass to envoy directly - // TODO(rb): make sure the cross-dc request properly fills in the alternate datacenters - // TODO(rb): handle subset.onlypassing var subset structs.ServiceResolverSubset if target.ServiceSubset != "" { var ok bool @@ -649,24 +639,12 @@ func (s *state) resetWatchesFromChain( meshGateway = structs.MeshGatewayModeNone } - filterExp := subset.Filter - if subset.OnlyPassing { - if filterExp != "" { - // TODO (filtering) - Update to "and all Checks as chk { chk.Status == passing }" - // once the syntax is supported - filterExp = fmt.Sprintf("(%s) and not Checks.Status != passing", filterExp) - } else { - filterExp = "not Checks.Status != passing" - } - } - - // TODO(rb): update the health endpoint to allow returning even unhealthy endpoints err = s.watchConnectProxyService( ctx, "upstream-target:"+string(encodedTarget)+":"+id, target.Service, target.Datacenter, - filterExp, + subset.Filter, meshGateway, ) if err != nil { diff --git a/agent/structs/discovery_chain.go b/agent/structs/discovery_chain.go index cf95314a36..6565aabf7a 100644 --- a/agent/structs/discovery_chain.go +++ b/agent/structs/discovery_chain.go @@ -32,7 +32,7 @@ type CompiledDiscoveryChain struct { // GroupResolverNodes respresents all unique service instance groups that // need to be represented. For envoy these render as Clusters. // - // Omitted from JSON because DiscoveryTarget is not a encoding.TextMarshaler. + // Omitted from JSON because these already show up under the Node field. GroupResolverNodes map[DiscoveryTarget]*DiscoveryGraphNode `json:"-"` // TODO(rb): not sure if these two fields are actually necessary but I'll know when I get into xDS @@ -54,6 +54,22 @@ func (c *CompiledDiscoveryChain) IsDefault() bool { c.Node.GroupResolver.Default } +// SubsetDefinitionForTarget is a convenience function to fetch the subset +// definition for the service subset defined by the provided target. If the +// subset is not defined an empty definition is returned. +func (c *CompiledDiscoveryChain) SubsetDefinitionForTarget(t DiscoveryTarget) ServiceResolverSubset { + if t.ServiceSubset == "" { + return ServiceResolverSubset{} + } + + resolver, ok := c.Resolvers[t.Service] + if !ok { + return ServiceResolverSubset{} + } + + return resolver.Subsets[t.ServiceSubset] +} + const ( DiscoveryGraphNodeTypeRouter = "router" DiscoveryGraphNodeTypeSplitter = "splitter" diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index 8f99abf457..db946bef19 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -57,7 +57,9 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps la := makeLoadAssignment( sni, 0, - []structs.CheckServiceNodes{endpoints}, + []loadAssignmentEndpointGroup{ + {Endpoints: endpoints}, + }, cfgSnap.Datacenter, ) resources = append(resources, la) @@ -68,7 +70,7 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps chainEndpointMap, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id] if !ok { - continue // TODO(rb): whaaaa? + continue // skip the upstream (should not happen) } for target, node := range chain.GroupResolverNodes { @@ -77,18 +79,23 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps endpoints, ok := chainEndpointMap[target] if !ok { - continue // TODO(rb): whaaaa? + continue // skip the cluster (should not happen) } var ( - priorityEndpoints []structs.CheckServiceNodes + endpointGroups []loadAssignmentEndpointGroup overprovisioningFactor int ) - if failover != nil && len(failover.Targets) > 0 { - priorityEndpoints = make([]structs.CheckServiceNodes, 0, len(failover.Targets)+1) + primaryGroup := loadAssignmentEndpointGroup{ + Endpoints: endpoints, + OnlyPassing: chain.SubsetDefinitionForTarget(target).OnlyPassing, + } - priorityEndpoints = append(priorityEndpoints, endpoints) + if failover != nil && len(failover.Targets) > 0 { + endpointGroups = make([]loadAssignmentEndpointGroup, 0, len(failover.Targets)+1) + + endpointGroups = append(endpointGroups, primaryGroup) if failover.Definition.OverprovisioningFactor > 0 { overprovisioningFactor = failover.Definition.OverprovisioningFactor @@ -101,14 +108,17 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps for _, failTarget := range failover.Targets { failEndpoints, ok := chainEndpointMap[failTarget] - if ok { - priorityEndpoints = append(priorityEndpoints, failEndpoints) + if !ok { + continue // skip the failover target (should not happen) } + + endpointGroups = append(endpointGroups, loadAssignmentEndpointGroup{ + Endpoints: failEndpoints, + OnlyPassing: chain.SubsetDefinitionForTarget(failTarget).OnlyPassing, + }) } } else { - priorityEndpoints = []structs.CheckServiceNodes{ - endpoints, - } + endpointGroups = append(endpointGroups, primaryGroup) } sni := TargetSNI(target, cfgSnap) @@ -116,7 +126,7 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps la := makeLoadAssignment( sni, overprovisioningFactor, - priorityEndpoints, + endpointGroups, cfgSnap.Datacenter, ) resources = append(resources, la) @@ -136,8 +146,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh la := makeLoadAssignment( clusterName, 0, - []structs.CheckServiceNodes{ - endpoints, + []loadAssignmentEndpointGroup{ + {Endpoints: endpoints}, }, cfgSnap.Datacenter, ) @@ -150,8 +160,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh la := makeLoadAssignment( clusterName, 0, - []structs.CheckServiceNodes{ - endpoints, + []loadAssignmentEndpointGroup{ + {Endpoints: endpoints}, }, cfgSnap.Datacenter, ) @@ -166,20 +176,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh endpoints := cfgSnap.MeshGateway.ServiceGroups[svc] // locally execute the subsets filter - filterExp := subset.Filter - if subset.OnlyPassing { - // we could do another filter pass without bexpr but this simplifies things a bit - if filterExp != "" { - // TODO (filtering) - Update to "and all Checks as chk { chk.Status == passing }" - // once the syntax is supported - filterExp = fmt.Sprintf("(%s) and not Checks.Status != passing", filterExp) - } else { - filterExp = "not Checks.Status != passing" - } - } - - if filterExp != "" { - filter, err := bexpr.CreateFilter(filterExp, nil, endpoints) + if subset.Filter != "" { + filter, err := bexpr.CreateFilter(subset.Filter, nil, endpoints) if err != nil { return nil, err } @@ -194,8 +192,11 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh la := makeLoadAssignment( clusterName, 0, - []structs.CheckServiceNodes{ - endpoints, + []loadAssignmentEndpointGroup{ + { + Endpoints: endpoints, + OnlyPassing: subset.OnlyPassing, + }, }, cfgSnap.Datacenter, ) @@ -216,15 +217,20 @@ func makeEndpoint(clusterName, host string, port int) envoyendpoint.LbEndpoint { } } +type loadAssignmentEndpointGroup struct { + Endpoints structs.CheckServiceNodes + OnlyPassing bool +} + func makeLoadAssignment( clusterName string, overprovisioningFactor int, - priorityEndpoints []structs.CheckServiceNodes, + endpointGroups []loadAssignmentEndpointGroup, localDatacenter string, ) *envoy.ClusterLoadAssignment { cla := &envoy.ClusterLoadAssignment{ ClusterName: clusterName, - Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(priorityEndpoints)), + Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(endpointGroups)), } if overprovisioningFactor > 0 { cla.Policy = &envoy.ClusterLoadAssignment_Policy{ @@ -232,7 +238,8 @@ func makeLoadAssignment( } } - for priority, endpoints := range priorityEndpoints { + for priority, endpointGroup := range endpointGroups { + endpoints := endpointGroup.Endpoints es := make([]envoyendpoint.LbEndpoint, 0, len(endpoints)) for _, ep := range endpoints { @@ -246,8 +253,9 @@ func makeLoadAssignment( for _, chk := range ep.Checks { if chk.Status == api.HealthCritical { - // This can't actually happen now because health always filters critical - // but in the future it may not so set this correctly! + healthStatus = envoycore.HealthStatus_UNHEALTHY + } + if endpointGroup.OnlyPassing && chk.Status != api.HealthPassing { healthStatus = envoycore.HealthStatus_UNHEALTHY } if chk.Status == api.HealthWarning && ep.Service.Weights != nil { diff --git a/agent/xds/endpoints_test.go b/agent/xds/endpoints_test.go index 1077a739ab..72e0fed27f 100644 --- a/agent/xds/endpoints_test.go +++ b/agent/xds/endpoints_test.go @@ -94,18 +94,19 @@ func Test_makeLoadAssignment(t *testing.T) { testWarningCheckServiceNodes[0].Checks[0].Status = "warning" testWarningCheckServiceNodes[1].Checks[0].Status = "warning" + // TODO(rb): test onlypassing tests := []struct { name string clusterName string overprovisioningFactor int - endpoints []structs.CheckServiceNodes + endpoints []loadAssignmentEndpointGroup want *envoy.ClusterLoadAssignment }{ { name: "no instances", clusterName: "service:test", - endpoints: []structs.CheckServiceNodes{ - {}, + endpoints: []loadAssignmentEndpointGroup{ + {Endpoints: nil}, }, want: &envoy.ClusterLoadAssignment{ ClusterName: "service:test", @@ -117,8 +118,8 @@ func Test_makeLoadAssignment(t *testing.T) { { name: "instances, no weights", clusterName: "service:test", - endpoints: []structs.CheckServiceNodes{ - testCheckServiceNodes, + endpoints: []loadAssignmentEndpointGroup{ + {Endpoints: testCheckServiceNodes}, }, want: &envoy.ClusterLoadAssignment{ ClusterName: "service:test", @@ -147,8 +148,8 @@ func Test_makeLoadAssignment(t *testing.T) { { name: "instances, healthy weights", clusterName: "service:test", - endpoints: []structs.CheckServiceNodes{ - testWeightedCheckServiceNodes, + endpoints: []loadAssignmentEndpointGroup{ + {Endpoints: testWeightedCheckServiceNodes}, }, want: &envoy.ClusterLoadAssignment{ ClusterName: "service:test", @@ -177,8 +178,8 @@ func Test_makeLoadAssignment(t *testing.T) { { name: "instances, warning weights", clusterName: "service:test", - endpoints: []structs.CheckServiceNodes{ - testWarningCheckServiceNodes, + endpoints: []loadAssignmentEndpointGroup{ + {Endpoints: testWarningCheckServiceNodes}, }, want: &envoy.ClusterLoadAssignment{ ClusterName: "service:test", @@ -207,7 +208,12 @@ func Test_makeLoadAssignment(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got := makeLoadAssignment(tt.clusterName, tt.overprovisioningFactor, tt.endpoints, "dc1") + got := makeLoadAssignment( + tt.clusterName, + tt.overprovisioningFactor, + tt.endpoints, + "dc1", + ) require.Equal(t, tt.want, got) }) } diff --git a/agent/xds/testdata/endpoints/mesh-gateway-service-subsets.golden b/agent/xds/testdata/endpoints/mesh-gateway-service-subsets.golden index 91e4079143..74c59f1a5d 100644 --- a/agent/xds/testdata/endpoints/mesh-gateway-service-subsets.golden +++ b/agent/xds/testdata/endpoints/mesh-gateway-service-subsets.golden @@ -246,6 +246,18 @@ }, "healthStatus": "HEALTHY", "loadBalancingWeight": 1 + }, + { + "endpoint": { + "address": { + "socketAddress": { + "address": "172.16.1.9", + "portValue": 2222 + } + } + }, + "healthStatus": "UNHEALTHY", + "loadBalancingWeight": 1 } ] } diff --git a/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/config_entries.hcl b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/config_entries.hcl new file mode 100644 index 0000000000..7e4a3cd918 --- /dev/null +++ b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/config_entries.hcl @@ -0,0 +1,25 @@ +enable_central_service_config = true + +config_entries { + bootstrap { + kind = "proxy-defaults" + name = "global" + + config { + protocol = "http" + } + } + + bootstrap { + kind = "service-resolver" + name = "s2" + + default_subset = "test" + + subsets = { + "test" = { + only_passing = true + } + } + } +} diff --git a/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/s2-v1.hcl b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/s2-v1.hcl new file mode 100644 index 0000000000..a2f6423e00 --- /dev/null +++ b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/s2-v1.hcl @@ -0,0 +1,22 @@ +services { + id = "s2-v1" + name = "s2" + port = 8182 + + meta { + version = "v1" + } + + checks = [ + { + name = "main" + ttl = "30m" + }, + ] + + connect { + sidecar_service { + port = 21011 + } + } +} diff --git a/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/setup.sh b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/setup.sh new file mode 100644 index 0000000000..5cdf0540d1 --- /dev/null +++ b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/setup.sh @@ -0,0 +1,25 @@ +#!/bin/bash + +set -euo pipefail + +# wait for service registration +wait_for_agent_service_register s1 +wait_for_agent_service_register s2 +wait_for_agent_service_register s2-v1 + +# force s2-v1 into a warning state +set_ttl_check_state service:s2-v1 warn + +# wait for bootstrap to apply config entries +wait_for_config_entry proxy-defaults global +wait_for_config_entry service-resolver s2 + +gen_envoy_bootstrap s1 19000 +gen_envoy_bootstrap s2 19001 +gen_envoy_bootstrap s2-v1 19002 + +export REQUIRED_SERVICES=" +s1 s1-sidecar-proxy +s2 s2-sidecar-proxy +s2-v1 s2-v1-sidecar-proxy +" diff --git a/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/verify.bats b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/verify.bats new file mode 100644 index 0000000000..10c6136aa9 --- /dev/null +++ b/test/integration/connect/envoy/case-cfg-resolver-subset-onlypassing/verify.bats @@ -0,0 +1,66 @@ +#!/usr/bin/env bats + +load helpers + +@test "s1 proxy admin is up on :19000" { + retry_default curl -f -s localhost:19000/stats -o /dev/null +} + +@test "s2 proxy admin is up on :19001" { + retry_default curl -f -s localhost:19001/stats -o /dev/null +} + +@test "s2-v1 proxy admin is up on :19002" { + retry_default curl -f -s localhost:19002/stats -o /dev/null +} + +@test "s1 proxy listener should be up and have right cert" { + assert_proxy_presents_cert_uri localhost:21000 s1 +} + +@test "s2 proxy listener should be up and have right cert" { + assert_proxy_presents_cert_uri localhost:21001 s2 +} + +@test "s2-v1 proxy listener should be up and have right cert" { + assert_proxy_presents_cert_uri localhost:21011 s2 +} + +########################### +## with onlypassing=true + +@test "only one s2 proxy is healthy" { + assert_service_has_healthy_instances s2 1 +} + +@test "s1 upstream should have 1 healthy endpoint for test.s2" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19000 test.s2 HEALTHY 1 +} + +@test "s1 upstream should have 1 unhealthy endpoints for test.s2" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19000 test.s2 UNHEALTHY 1 +} + +@test "s1 upstream should be able to connect to s2" { + assert_expected_fortio_name s2 +} + +########################### +## with onlypassing=false + +@test "switch back to OnlyPassing=false by deleting the config" { + delete_config_entry service-resolver s2 +} + +@test "only one s2 proxy is healthy (OnlyPassing=false)" { + assert_service_has_healthy_instances s2 1 +} + +@test "s1 upstream should have 2 healthy endpoints for test.s2 (OnlyPassing=false)" { + # NOTE: the subset is erased, so we use the bare name now + assert_upstream_has_endpoints_in_status 127.0.0.1:19000 s2 HEALTHY 2 +} + +@test "s1 upstream should have 0 unhealthy endpoints for test.s2 (OnlyPassing=false)" { + assert_upstream_has_endpoints_in_status 127.0.0.1:19000 s2 UNHEALTHY 0 +} diff --git a/test/integration/connect/envoy/helpers.bash b/test/integration/connect/envoy/helpers.bash index c1c6f368cc..7ea1792f2c 100755 --- a/test/integration/connect/envoy/helpers.bash +++ b/test/integration/connect/envoy/helpers.bash @@ -191,6 +191,10 @@ function docker_wget { docker run -ti --rm --network container:envoy_consul_1 alpine:3.9 wget $@ } +function docker_curl { + docker run -ti --rm --network container:envoy_consul_1 --entrypoint curl consul-dev $@ +} + function get_envoy_pid { local BOOTSTRAP_NAME=$1 run ps aux @@ -293,6 +297,36 @@ function wait_for_config_entry { retry_default read_config_entry $KIND $NAME >/dev/null } +function delete_config_entry { + local KIND=$1 + local NAME=$2 + retry_default curl -sL -XDELETE "http://127.0.0.1:8500/v1/config/${KIND}/${NAME}" +} + +function wait_for_agent_service_register { + local SERVICE_ID=$1 + retry_default docker_curl -sLf "http://127.0.0.1:8500/v1/agent/service/${SERVICE_ID}" >/dev/null +} + +function set_ttl_check_state { + local CHECK_ID=$1 + local CHECK_STATE=$2 + + case "$CHECK_STATE" in + pass) + ;; + warn) + ;; + fail) + ;; + *) + echo "invalid ttl check state '${CHECK_STATE}'" >&2 + return 1 + esac + + retry_default docker_curl -sL -XPUT "http://localhost:8500/v1/agent/check/warn/${CHECK_ID}" +} + function get_upstream_fortio_name { run retry_default curl -v -s -f localhost:5000/debug?env=dump [ "$status" == 0 ] diff --git a/website/source/docs/agent/config-entries/service-resolver.html.md b/website/source/docs/agent/config-entries/service-resolver.html.md index d0a6a782d9..b37f781c1d 100644 --- a/website/source/docs/agent/config-entries/service-resolver.html.md +++ b/website/source/docs/agent/config-entries/service-resolver.html.md @@ -94,10 +94,10 @@ name = "web" returned. - `OnlyPassing` `(bool: false)` - Specifies the behavior of the resolver's - health check filtering. If this is set to false, the results will include - instances with checks in the passing as well as the warning states. If this - is set to true, only instances with checks in the passing state will be - returned. + health check interpretation. If this is set to false, instances with checks + in the passing as well as the warning states will be considered healthy. If + this is set to true, only instances with checks in the passing state will + be considered healthy. - `Redirect` `(ServiceResolverRedirect: )` - When configured, all attempts to resolve the service this resolver defines will be substituted for