From cc0765b87dc8e6628dedcfc2d2742b550c4e1321 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Fri, 3 Mar 2023 14:17:11 -0500 Subject: [PATCH] Fix resolution of service resolvers with subsets for external upstreams (#16499) * Fix resolution of service resolvers with subsets for external upstreams * Add tests * Add changelog entry * Update view filter logic --- .changelog/16499.txt | 3 ++ agent/rpcclient/health/view.go | 24 ++++++++++--- agent/rpcclient/health/view_test.go | 36 +++++++++++++++++++ .../capture.sh | 1 + .../service_s3.hcl | 17 +++++++++ .../case-terminating-gateway-subsets/setup.sh | 1 + .../case-terminating-gateway-subsets/vars.sh | 1 + .../verify.bats | 4 +++ test/integration/connect/envoy/helpers.bash | 33 +++++++++++++++++ 9 files changed, 116 insertions(+), 4 deletions(-) create mode 100644 .changelog/16499.txt create mode 100644 test/integration/connect/envoy/case-terminating-gateway-subsets/service_s3.hcl diff --git a/.changelog/16499.txt b/.changelog/16499.txt new file mode 100644 index 0000000000..4bd50db47e --- /dev/null +++ b/.changelog/16499.txt @@ -0,0 +1,3 @@ +```release-note:bug +mesh: Fix resolution of service resolvers with subsets for external upstreams +``` diff --git a/agent/rpcclient/health/view.go b/agent/rpcclient/health/view.go index 1684382de7..4dabc5e695 100644 --- a/agent/rpcclient/health/view.go +++ b/agent/rpcclient/health/view.go @@ -50,8 +50,10 @@ func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) { return nil, err } return &HealthView{ - state: make(map[string]structs.CheckServiceNode), - filter: fe, + state: make(map[string]structs.CheckServiceNode), + filter: fe, + connect: req.Connect, + kind: req.ServiceKind, }, nil } @@ -61,8 +63,10 @@ func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) { // (IndexedCheckServiceNodes) and update it in place for each event - that // involves re-sorting each time etc. though. type HealthView struct { - state map[string]structs.CheckServiceNode - filter filterEvaluator + connect bool + kind structs.ServiceKind + state map[string]structs.CheckServiceNode + filter filterEvaluator } // Update implements View @@ -84,6 +88,13 @@ func (s *HealthView) Update(events []*pbsubscribe.Event) error { if csn == nil { return errors.New("check service node was unexpectedly nil") } + + // check if we intentionally need to skip the filter + if s.skipFilter(csn) { + s.state[id] = *csn + continue + } + passed, err := s.filter.Evaluate(*csn) if err != nil { return err @@ -100,6 +111,11 @@ func (s *HealthView) Update(events []*pbsubscribe.Event) error { return nil } +func (s *HealthView) skipFilter(csn *structs.CheckServiceNode) bool { + // we only do this for connect-enabled services that need to be routed through a terminating gateway + return s.kind == "" && s.connect && csn.Service.Kind == structs.ServiceKindTerminatingGateway +} + type filterEvaluator interface { Evaluate(datum interface{}) (bool, error) } diff --git a/agent/rpcclient/health/view_test.go b/agent/rpcclient/health/view_test.go index b419bc1cea..0d199243b2 100644 --- a/agent/rpcclient/health/view_test.go +++ b/agent/rpcclient/health/view_test.go @@ -941,3 +941,39 @@ func TestNewFilterEvaluator(t *testing.T) { }) } } + +func TestHealthView_SkipFilteringTerminatingGateways(t *testing.T) { + view, err := NewHealthView(structs.ServiceSpecificRequest{ + ServiceName: "name", + Connect: true, + QueryOptions: structs.QueryOptions{ + Filter: "Service.Meta.version == \"v1\"", + }, + }) + require.NoError(t, err) + + err = view.Update([]*pbsubscribe.Event{{ + Index: 1, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Service: &pbservice.NodeService{ + Kind: structs.TerminatingGateway, + Service: "name", + Address: "127.0.0.1", + Port: 8443, + }, + }, + }, + }, + }}) + require.NoError(t, err) + + node, ok := (view.Result(1)).(*structs.IndexedCheckServiceNodes) + require.True(t, ok) + + require.Len(t, node.Nodes, 1) + require.Equal(t, "127.0.0.1", node.Nodes[0].Service.Address) + require.Equal(t, 8443, node.Nodes[0].Service.Port) +} diff --git a/test/integration/connect/envoy/case-terminating-gateway-subsets/capture.sh b/test/integration/connect/envoy/case-terminating-gateway-subsets/capture.sh index 2ef0c41a21..261bf4e29a 100644 --- a/test/integration/connect/envoy/case-terminating-gateway-subsets/capture.sh +++ b/test/integration/connect/envoy/case-terminating-gateway-subsets/capture.sh @@ -2,3 +2,4 @@ snapshot_envoy_admin localhost:20000 terminating-gateway primary || true snapshot_envoy_admin localhost:19000 s1 primary || true +snapshot_envoy_admin localhost:19001 s3 primary || true diff --git a/test/integration/connect/envoy/case-terminating-gateway-subsets/service_s3.hcl b/test/integration/connect/envoy/case-terminating-gateway-subsets/service_s3.hcl new file mode 100644 index 0000000000..eb84c578ee --- /dev/null +++ b/test/integration/connect/envoy/case-terminating-gateway-subsets/service_s3.hcl @@ -0,0 +1,17 @@ +services { + id = "s3" + name = "s3" + port = 8184 + connect { + sidecar_service { + proxy { + upstreams = [ + { + destination_name = "s2" + local_bind_port = 8185 + } + ] + } + } + } +} diff --git a/test/integration/connect/envoy/case-terminating-gateway-subsets/setup.sh b/test/integration/connect/envoy/case-terminating-gateway-subsets/setup.sh index fdd49572ba..57b85c74a6 100644 --- a/test/integration/connect/envoy/case-terminating-gateway-subsets/setup.sh +++ b/test/integration/connect/envoy/case-terminating-gateway-subsets/setup.sh @@ -38,4 +38,5 @@ register_services primary # terminating gateway will act as s2's proxy gen_envoy_bootstrap s1 19000 +gen_envoy_bootstrap s3 19001 gen_envoy_bootstrap terminating-gateway 20000 primary true diff --git a/test/integration/connect/envoy/case-terminating-gateway-subsets/vars.sh b/test/integration/connect/envoy/case-terminating-gateway-subsets/vars.sh index 9e52629b8b..d4a4d75bdd 100644 --- a/test/integration/connect/envoy/case-terminating-gateway-subsets/vars.sh +++ b/test/integration/connect/envoy/case-terminating-gateway-subsets/vars.sh @@ -4,5 +4,6 @@ export REQUIRED_SERVICES=" s1 s1-sidecar-proxy s2-v1 +s3 s3-sidecar-proxy terminating-gateway-primary " diff --git a/test/integration/connect/envoy/case-terminating-gateway-subsets/verify.bats b/test/integration/connect/envoy/case-terminating-gateway-subsets/verify.bats index 64a2499e35..028ddea85a 100644 --- a/test/integration/connect/envoy/case-terminating-gateway-subsets/verify.bats +++ b/test/integration/connect/envoy/case-terminating-gateway-subsets/verify.bats @@ -38,3 +38,7 @@ load helpers assert_envoy_metric_at_least 127.0.0.1:20000 "v1.s2.default.primary.*cx_total" 1 } +@test "terminating-gateway is used for the upstream connection of the proxy" { + # make sure we resolve the terminating gateway as endpoint for the upstream + assert_upstream_has_endpoint_port 127.0.0.1:19001 "v1.s2" 8443 +} diff --git a/test/integration/connect/envoy/helpers.bash b/test/integration/connect/envoy/helpers.bash index 65bbe3b007..a650f4ee29 100755 --- a/test/integration/connect/envoy/helpers.bash +++ b/test/integration/connect/envoy/helpers.bash @@ -361,6 +361,39 @@ function get_upstream_endpoint { | select(.name|startswith(\"${CLUSTER_NAME}\"))" } +function get_upstream_endpoint_port { + local HOSTPORT=$1 + local CLUSTER_NAME=$2 + local PORT_VALUE=$3 + run curl -s -f "http://${HOSTPORT}/clusters?format=json" + [ "$status" -eq 0 ] + echo "$output" | jq --raw-output " +.cluster_statuses[] +| select(.name|startswith(\"${CLUSTER_NAME}\")) +| [.host_statuses[].address.socket_address.port_value] +| [select(.[] == ${PORT_VALUE})] +| length" +} + +function assert_upstream_has_endpoint_port_once { + local HOSTPORT=$1 + local CLUSTER_NAME=$2 + local PORT_VALUE=$3 + + GOT_COUNT=$(get_upstream_endpoint_port $HOSTPORT $CLUSTER_NAME $PORT_VALUE) + + [ "$GOT_COUNT" -eq 1 ] +} + +function assert_upstream_has_endpoint_port { + local HOSTPORT=$1 + local CLUSTER_NAME=$2 + local PORT_VALUE=$3 + + run retry_long assert_upstream_has_endpoint_port_once $HOSTPORT $CLUSTER_NAME $PORT_VALUE + [ "$status" -eq 0 ] +} + function get_upstream_endpoint_in_status_count { local HOSTPORT=$1 local CLUSTER_NAME=$2