connect: rework how the service resolver subset OnlyPassing flag works (#6173)

The main change is that we no longer filter service instances by health,
preferring instead to render all results down into EDS endpoints in
envoy and merely label the endpoints as HEALTHY or UNHEALTHY.

When OnlyPassing is set to true we will force consul checks in a
'warning' state to render as UNHEALTHY in envoy.

Fixes #6171
This commit is contained in:
R.B. Boyer 2019-07-23 20:20:24 -05:00 committed by GitHub
parent aca2c5de3f
commit e039dfd7f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 268 additions and 76 deletions

View File

@ -502,8 +502,6 @@ func (s *state) handleUpdateConnectProxy(u cache.UpdateEvent, snap *ConfigSnapsh
return fmt.Errorf("invalid correlation id %q: %v", u.CorrelationID, err) return fmt.Errorf("invalid correlation id %q: %v", u.CorrelationID, err)
} }
// TODO(rb): do we have to do onlypassing filters here?
m, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[svc] m, ok := snap.ConnectProxy.WatchedUpstreamEndpoints[svc]
if !ok { if !ok {
m = make(map[structs.DiscoveryTarget]structs.CheckServiceNodes) m = make(map[structs.DiscoveryTarget]structs.CheckServiceNodes)
@ -608,16 +606,8 @@ func (s *state) resetWatchesFromChain(
} }
s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: initializing watch of target %s", id, chain.ServiceName, target) s.logger.Printf("[TRACE] proxycfg: upstream=%q:chain=%q: initializing watch of target %s", id, chain.ServiceName, target)
// snap.WatchedUpstreams[name]
// delete(snap.WatchedUpstreams[name], target)
// delete(snap.WatchedUpstreamEndpoint[name], target)
// TODO(rb): augment the health rpc so we can get the health information to pass to envoy directly
// TODO(rb): make sure the cross-dc request properly fills in the alternate datacenters // TODO(rb): make sure the cross-dc request properly fills in the alternate datacenters
// TODO(rb): handle subset.onlypassing
var subset structs.ServiceResolverSubset var subset structs.ServiceResolverSubset
if target.ServiceSubset != "" { if target.ServiceSubset != "" {
var ok bool var ok bool
@ -649,24 +639,12 @@ func (s *state) resetWatchesFromChain(
meshGateway = structs.MeshGatewayModeNone meshGateway = structs.MeshGatewayModeNone
} }
filterExp := subset.Filter
if subset.OnlyPassing {
if filterExp != "" {
// TODO (filtering) - Update to "and all Checks as chk { chk.Status == passing }"
// once the syntax is supported
filterExp = fmt.Sprintf("(%s) and not Checks.Status != passing", filterExp)
} else {
filterExp = "not Checks.Status != passing"
}
}
// TODO(rb): update the health endpoint to allow returning even unhealthy endpoints
err = s.watchConnectProxyService( err = s.watchConnectProxyService(
ctx, ctx,
"upstream-target:"+string(encodedTarget)+":"+id, "upstream-target:"+string(encodedTarget)+":"+id,
target.Service, target.Service,
target.Datacenter, target.Datacenter,
filterExp, subset.Filter,
meshGateway, meshGateway,
) )
if err != nil { if err != nil {

View File

@ -32,7 +32,7 @@ type CompiledDiscoveryChain struct {
// GroupResolverNodes respresents all unique service instance groups that // GroupResolverNodes respresents all unique service instance groups that
// need to be represented. For envoy these render as Clusters. // need to be represented. For envoy these render as Clusters.
// //
// Omitted from JSON because DiscoveryTarget is not a encoding.TextMarshaler. // Omitted from JSON because these already show up under the Node field.
GroupResolverNodes map[DiscoveryTarget]*DiscoveryGraphNode `json:"-"` GroupResolverNodes map[DiscoveryTarget]*DiscoveryGraphNode `json:"-"`
// TODO(rb): not sure if these two fields are actually necessary but I'll know when I get into xDS // TODO(rb): not sure if these two fields are actually necessary but I'll know when I get into xDS
@ -54,6 +54,22 @@ func (c *CompiledDiscoveryChain) IsDefault() bool {
c.Node.GroupResolver.Default c.Node.GroupResolver.Default
} }
// SubsetDefinitionForTarget is a convenience function to fetch the subset
// definition for the service subset defined by the provided target. If the
// subset is not defined an empty definition is returned.
func (c *CompiledDiscoveryChain) SubsetDefinitionForTarget(t DiscoveryTarget) ServiceResolverSubset {
if t.ServiceSubset == "" {
return ServiceResolverSubset{}
}
resolver, ok := c.Resolvers[t.Service]
if !ok {
return ServiceResolverSubset{}
}
return resolver.Subsets[t.ServiceSubset]
}
const ( const (
DiscoveryGraphNodeTypeRouter = "router" DiscoveryGraphNodeTypeRouter = "router"
DiscoveryGraphNodeTypeSplitter = "splitter" DiscoveryGraphNodeTypeSplitter = "splitter"

View File

@ -57,7 +57,9 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
la := makeLoadAssignment( la := makeLoadAssignment(
sni, sni,
0, 0,
[]structs.CheckServiceNodes{endpoints}, []loadAssignmentEndpointGroup{
{Endpoints: endpoints},
},
cfgSnap.Datacenter, cfgSnap.Datacenter,
) )
resources = append(resources, la) resources = append(resources, la)
@ -68,7 +70,7 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
chainEndpointMap, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id] chainEndpointMap, ok := cfgSnap.ConnectProxy.WatchedUpstreamEndpoints[id]
if !ok { if !ok {
continue // TODO(rb): whaaaa? continue // skip the upstream (should not happen)
} }
for target, node := range chain.GroupResolverNodes { for target, node := range chain.GroupResolverNodes {
@ -77,18 +79,23 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
endpoints, ok := chainEndpointMap[target] endpoints, ok := chainEndpointMap[target]
if !ok { if !ok {
continue // TODO(rb): whaaaa? continue // skip the cluster (should not happen)
} }
var ( var (
priorityEndpoints []structs.CheckServiceNodes endpointGroups []loadAssignmentEndpointGroup
overprovisioningFactor int overprovisioningFactor int
) )
if failover != nil && len(failover.Targets) > 0 { primaryGroup := loadAssignmentEndpointGroup{
priorityEndpoints = make([]structs.CheckServiceNodes, 0, len(failover.Targets)+1) Endpoints: endpoints,
OnlyPassing: chain.SubsetDefinitionForTarget(target).OnlyPassing,
}
priorityEndpoints = append(priorityEndpoints, endpoints) if failover != nil && len(failover.Targets) > 0 {
endpointGroups = make([]loadAssignmentEndpointGroup, 0, len(failover.Targets)+1)
endpointGroups = append(endpointGroups, primaryGroup)
if failover.Definition.OverprovisioningFactor > 0 { if failover.Definition.OverprovisioningFactor > 0 {
overprovisioningFactor = failover.Definition.OverprovisioningFactor overprovisioningFactor = failover.Definition.OverprovisioningFactor
@ -101,14 +108,17 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
for _, failTarget := range failover.Targets { for _, failTarget := range failover.Targets {
failEndpoints, ok := chainEndpointMap[failTarget] failEndpoints, ok := chainEndpointMap[failTarget]
if ok { if !ok {
priorityEndpoints = append(priorityEndpoints, failEndpoints) continue // skip the failover target (should not happen)
} }
endpointGroups = append(endpointGroups, loadAssignmentEndpointGroup{
Endpoints: failEndpoints,
OnlyPassing: chain.SubsetDefinitionForTarget(failTarget).OnlyPassing,
})
} }
} else { } else {
priorityEndpoints = []structs.CheckServiceNodes{ endpointGroups = append(endpointGroups, primaryGroup)
endpoints,
}
} }
sni := TargetSNI(target, cfgSnap) sni := TargetSNI(target, cfgSnap)
@ -116,7 +126,7 @@ func (s *Server) endpointsFromSnapshotConnectProxy(cfgSnap *proxycfg.ConfigSnaps
la := makeLoadAssignment( la := makeLoadAssignment(
sni, sni,
overprovisioningFactor, overprovisioningFactor,
priorityEndpoints, endpointGroups,
cfgSnap.Datacenter, cfgSnap.Datacenter,
) )
resources = append(resources, la) resources = append(resources, la)
@ -136,8 +146,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
la := makeLoadAssignment( la := makeLoadAssignment(
clusterName, clusterName,
0, 0,
[]structs.CheckServiceNodes{ []loadAssignmentEndpointGroup{
endpoints, {Endpoints: endpoints},
}, },
cfgSnap.Datacenter, cfgSnap.Datacenter,
) )
@ -150,8 +160,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
la := makeLoadAssignment( la := makeLoadAssignment(
clusterName, clusterName,
0, 0,
[]structs.CheckServiceNodes{ []loadAssignmentEndpointGroup{
endpoints, {Endpoints: endpoints},
}, },
cfgSnap.Datacenter, cfgSnap.Datacenter,
) )
@ -166,20 +176,8 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
endpoints := cfgSnap.MeshGateway.ServiceGroups[svc] endpoints := cfgSnap.MeshGateway.ServiceGroups[svc]
// locally execute the subsets filter // locally execute the subsets filter
filterExp := subset.Filter if subset.Filter != "" {
if subset.OnlyPassing { filter, err := bexpr.CreateFilter(subset.Filter, nil, endpoints)
// we could do another filter pass without bexpr but this simplifies things a bit
if filterExp != "" {
// TODO (filtering) - Update to "and all Checks as chk { chk.Status == passing }"
// once the syntax is supported
filterExp = fmt.Sprintf("(%s) and not Checks.Status != passing", filterExp)
} else {
filterExp = "not Checks.Status != passing"
}
}
if filterExp != "" {
filter, err := bexpr.CreateFilter(filterExp, nil, endpoints)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -194,8 +192,11 @@ func (s *Server) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.ConfigSnapsh
la := makeLoadAssignment( la := makeLoadAssignment(
clusterName, clusterName,
0, 0,
[]structs.CheckServiceNodes{ []loadAssignmentEndpointGroup{
endpoints, {
Endpoints: endpoints,
OnlyPassing: subset.OnlyPassing,
},
}, },
cfgSnap.Datacenter, cfgSnap.Datacenter,
) )
@ -216,15 +217,20 @@ func makeEndpoint(clusterName, host string, port int) envoyendpoint.LbEndpoint {
} }
} }
type loadAssignmentEndpointGroup struct {
Endpoints structs.CheckServiceNodes
OnlyPassing bool
}
func makeLoadAssignment( func makeLoadAssignment(
clusterName string, clusterName string,
overprovisioningFactor int, overprovisioningFactor int,
priorityEndpoints []structs.CheckServiceNodes, endpointGroups []loadAssignmentEndpointGroup,
localDatacenter string, localDatacenter string,
) *envoy.ClusterLoadAssignment { ) *envoy.ClusterLoadAssignment {
cla := &envoy.ClusterLoadAssignment{ cla := &envoy.ClusterLoadAssignment{
ClusterName: clusterName, ClusterName: clusterName,
Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(priorityEndpoints)), Endpoints: make([]envoyendpoint.LocalityLbEndpoints, 0, len(endpointGroups)),
} }
if overprovisioningFactor > 0 { if overprovisioningFactor > 0 {
cla.Policy = &envoy.ClusterLoadAssignment_Policy{ cla.Policy = &envoy.ClusterLoadAssignment_Policy{
@ -232,7 +238,8 @@ func makeLoadAssignment(
} }
} }
for priority, endpoints := range priorityEndpoints { for priority, endpointGroup := range endpointGroups {
endpoints := endpointGroup.Endpoints
es := make([]envoyendpoint.LbEndpoint, 0, len(endpoints)) es := make([]envoyendpoint.LbEndpoint, 0, len(endpoints))
for _, ep := range endpoints { for _, ep := range endpoints {
@ -246,8 +253,9 @@ func makeLoadAssignment(
for _, chk := range ep.Checks { for _, chk := range ep.Checks {
if chk.Status == api.HealthCritical { if chk.Status == api.HealthCritical {
// This can't actually happen now because health always filters critical healthStatus = envoycore.HealthStatus_UNHEALTHY
// but in the future it may not so set this correctly! }
if endpointGroup.OnlyPassing && chk.Status != api.HealthPassing {
healthStatus = envoycore.HealthStatus_UNHEALTHY healthStatus = envoycore.HealthStatus_UNHEALTHY
} }
if chk.Status == api.HealthWarning && ep.Service.Weights != nil { if chk.Status == api.HealthWarning && ep.Service.Weights != nil {

View File

@ -94,18 +94,19 @@ func Test_makeLoadAssignment(t *testing.T) {
testWarningCheckServiceNodes[0].Checks[0].Status = "warning" testWarningCheckServiceNodes[0].Checks[0].Status = "warning"
testWarningCheckServiceNodes[1].Checks[0].Status = "warning" testWarningCheckServiceNodes[1].Checks[0].Status = "warning"
// TODO(rb): test onlypassing
tests := []struct { tests := []struct {
name string name string
clusterName string clusterName string
overprovisioningFactor int overprovisioningFactor int
endpoints []structs.CheckServiceNodes endpoints []loadAssignmentEndpointGroup
want *envoy.ClusterLoadAssignment want *envoy.ClusterLoadAssignment
}{ }{
{ {
name: "no instances", name: "no instances",
clusterName: "service:test", clusterName: "service:test",
endpoints: []structs.CheckServiceNodes{ endpoints: []loadAssignmentEndpointGroup{
{}, {Endpoints: nil},
}, },
want: &envoy.ClusterLoadAssignment{ want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test", ClusterName: "service:test",
@ -117,8 +118,8 @@ func Test_makeLoadAssignment(t *testing.T) {
{ {
name: "instances, no weights", name: "instances, no weights",
clusterName: "service:test", clusterName: "service:test",
endpoints: []structs.CheckServiceNodes{ endpoints: []loadAssignmentEndpointGroup{
testCheckServiceNodes, {Endpoints: testCheckServiceNodes},
}, },
want: &envoy.ClusterLoadAssignment{ want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test", ClusterName: "service:test",
@ -147,8 +148,8 @@ func Test_makeLoadAssignment(t *testing.T) {
{ {
name: "instances, healthy weights", name: "instances, healthy weights",
clusterName: "service:test", clusterName: "service:test",
endpoints: []structs.CheckServiceNodes{ endpoints: []loadAssignmentEndpointGroup{
testWeightedCheckServiceNodes, {Endpoints: testWeightedCheckServiceNodes},
}, },
want: &envoy.ClusterLoadAssignment{ want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test", ClusterName: "service:test",
@ -177,8 +178,8 @@ func Test_makeLoadAssignment(t *testing.T) {
{ {
name: "instances, warning weights", name: "instances, warning weights",
clusterName: "service:test", clusterName: "service:test",
endpoints: []structs.CheckServiceNodes{ endpoints: []loadAssignmentEndpointGroup{
testWarningCheckServiceNodes, {Endpoints: testWarningCheckServiceNodes},
}, },
want: &envoy.ClusterLoadAssignment{ want: &envoy.ClusterLoadAssignment{
ClusterName: "service:test", ClusterName: "service:test",
@ -207,7 +208,12 @@ func Test_makeLoadAssignment(t *testing.T) {
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
got := makeLoadAssignment(tt.clusterName, tt.overprovisioningFactor, tt.endpoints, "dc1") got := makeLoadAssignment(
tt.clusterName,
tt.overprovisioningFactor,
tt.endpoints,
"dc1",
)
require.Equal(t, tt.want, got) require.Equal(t, tt.want, got)
}) })
} }

View File

@ -246,6 +246,18 @@
}, },
"healthStatus": "HEALTHY", "healthStatus": "HEALTHY",
"loadBalancingWeight": 1 "loadBalancingWeight": 1
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "172.16.1.9",
"portValue": 2222
}
}
},
"healthStatus": "UNHEALTHY",
"loadBalancingWeight": 1
} }
] ]
} }

View File

@ -0,0 +1,25 @@
enable_central_service_config = true
config_entries {
bootstrap {
kind = "proxy-defaults"
name = "global"
config {
protocol = "http"
}
}
bootstrap {
kind = "service-resolver"
name = "s2"
default_subset = "test"
subsets = {
"test" = {
only_passing = true
}
}
}
}

View File

@ -0,0 +1,22 @@
services {
id = "s2-v1"
name = "s2"
port = 8182
meta {
version = "v1"
}
checks = [
{
name = "main"
ttl = "30m"
},
]
connect {
sidecar_service {
port = 21011
}
}
}

View File

@ -0,0 +1,25 @@
#!/bin/bash
set -euo pipefail
# wait for service registration
wait_for_agent_service_register s1
wait_for_agent_service_register s2
wait_for_agent_service_register s2-v1
# force s2-v1 into a warning state
set_ttl_check_state service:s2-v1 warn
# wait for bootstrap to apply config entries
wait_for_config_entry proxy-defaults global
wait_for_config_entry service-resolver s2
gen_envoy_bootstrap s1 19000
gen_envoy_bootstrap s2 19001
gen_envoy_bootstrap s2-v1 19002
export REQUIRED_SERVICES="
s1 s1-sidecar-proxy
s2 s2-sidecar-proxy
s2-v1 s2-v1-sidecar-proxy
"

View File

@ -0,0 +1,66 @@
#!/usr/bin/env bats
load helpers
@test "s1 proxy admin is up on :19000" {
retry_default curl -f -s localhost:19000/stats -o /dev/null
}
@test "s2 proxy admin is up on :19001" {
retry_default curl -f -s localhost:19001/stats -o /dev/null
}
@test "s2-v1 proxy admin is up on :19002" {
retry_default curl -f -s localhost:19002/stats -o /dev/null
}
@test "s1 proxy listener should be up and have right cert" {
assert_proxy_presents_cert_uri localhost:21000 s1
}
@test "s2 proxy listener should be up and have right cert" {
assert_proxy_presents_cert_uri localhost:21001 s2
}
@test "s2-v1 proxy listener should be up and have right cert" {
assert_proxy_presents_cert_uri localhost:21011 s2
}
###########################
## with onlypassing=true
@test "only one s2 proxy is healthy" {
assert_service_has_healthy_instances s2 1
}
@test "s1 upstream should have 1 healthy endpoint for test.s2" {
assert_upstream_has_endpoints_in_status 127.0.0.1:19000 test.s2 HEALTHY 1
}
@test "s1 upstream should have 1 unhealthy endpoints for test.s2" {
assert_upstream_has_endpoints_in_status 127.0.0.1:19000 test.s2 UNHEALTHY 1
}
@test "s1 upstream should be able to connect to s2" {
assert_expected_fortio_name s2
}
###########################
## with onlypassing=false
@test "switch back to OnlyPassing=false by deleting the config" {
delete_config_entry service-resolver s2
}
@test "only one s2 proxy is healthy (OnlyPassing=false)" {
assert_service_has_healthy_instances s2 1
}
@test "s1 upstream should have 2 healthy endpoints for test.s2 (OnlyPassing=false)" {
# NOTE: the subset is erased, so we use the bare name now
assert_upstream_has_endpoints_in_status 127.0.0.1:19000 s2 HEALTHY 2
}
@test "s1 upstream should have 0 unhealthy endpoints for test.s2 (OnlyPassing=false)" {
assert_upstream_has_endpoints_in_status 127.0.0.1:19000 s2 UNHEALTHY 0
}

View File

@ -191,6 +191,10 @@ function docker_wget {
docker run -ti --rm --network container:envoy_consul_1 alpine:3.9 wget $@ docker run -ti --rm --network container:envoy_consul_1 alpine:3.9 wget $@
} }
function docker_curl {
docker run -ti --rm --network container:envoy_consul_1 --entrypoint curl consul-dev $@
}
function get_envoy_pid { function get_envoy_pid {
local BOOTSTRAP_NAME=$1 local BOOTSTRAP_NAME=$1
run ps aux run ps aux
@ -293,6 +297,36 @@ function wait_for_config_entry {
retry_default read_config_entry $KIND $NAME >/dev/null retry_default read_config_entry $KIND $NAME >/dev/null
} }
function delete_config_entry {
local KIND=$1
local NAME=$2
retry_default curl -sL -XDELETE "http://127.0.0.1:8500/v1/config/${KIND}/${NAME}"
}
function wait_for_agent_service_register {
local SERVICE_ID=$1
retry_default docker_curl -sLf "http://127.0.0.1:8500/v1/agent/service/${SERVICE_ID}" >/dev/null
}
function set_ttl_check_state {
local CHECK_ID=$1
local CHECK_STATE=$2
case "$CHECK_STATE" in
pass)
;;
warn)
;;
fail)
;;
*)
echo "invalid ttl check state '${CHECK_STATE}'" >&2
return 1
esac
retry_default docker_curl -sL -XPUT "http://localhost:8500/v1/agent/check/warn/${CHECK_ID}"
}
function get_upstream_fortio_name { function get_upstream_fortio_name {
run retry_default curl -v -s -f localhost:5000/debug?env=dump run retry_default curl -v -s -f localhost:5000/debug?env=dump
[ "$status" == 0 ] [ "$status" == 0 ]

View File

@ -94,10 +94,10 @@ name = "web"
returned. returned.
- `OnlyPassing` `(bool: false)` - Specifies the behavior of the resolver's - `OnlyPassing` `(bool: false)` - Specifies the behavior of the resolver's
health check filtering. If this is set to false, the results will include health check interpretation. If this is set to false, instances with checks
instances with checks in the passing as well as the warning states. If this in the passing as well as the warning states will be considered healthy. If
is set to true, only instances with checks in the passing state will be this is set to true, only instances with checks in the passing state will
returned. be considered healthy.
- `Redirect` `(ServiceResolverRedirect: <optional>)` - When configured, all - `Redirect` `(ServiceResolverRedirect: <optional>)` - When configured, all
attempts to resolve the service this resolver defines will be substituted for attempts to resolve the service this resolver defines will be substituted for