From fa6eb61f70f2b8dc548ae4fd647418ada0be30a8 Mon Sep 17 00:00:00 2001 From: Manoj Srinivasamurthy Date: Wed, 10 Jan 2024 21:25:50 +0530 Subject: [PATCH] NET-6813: adding resolver default subset test in agentless upgrade test (#20046) --- test-integ/topoutil/asserter.go | 89 ++++++- .../upgrade/l7_traffic_management/common.go | 231 ++++++++++++++++++ .../l7_traffic_management/resolver_test.go | 116 +++++++++ .../consul-container/libs/assert/envoy.go | 38 +++ .../resolver_default_subset_test.go | 4 +- 5 files changed, 464 insertions(+), 14 deletions(-) create mode 100644 test-integ/upgrade/l7_traffic_management/common.go create mode 100644 test-integ/upgrade/l7_traffic_management/resolver_test.go diff --git a/test-integ/topoutil/asserter.go b/test-integ/topoutil/asserter.go index 091ba23797..7765e20c6c 100644 --- a/test-integ/topoutil/asserter.go +++ b/test-integ/topoutil/asserter.go @@ -101,6 +101,30 @@ func (a *Asserter) DestinationEndpointStatus( libassert.AssertUpstreamEndpointStatusWithClient(t, client, addr, clusterName, healthStatus, count) } +func (a *Asserter) getEnvoyClient(t *testing.T, workload *topology.Workload) (client *http.Client, addr string) { + node := workload.Node + ip := node.LocalAddress() + port := workload.EnvoyAdminPort + addr = fmt.Sprintf("%s:%d", ip, port) + client = a.mustGetHTTPClient(t, node.Cluster) + return client, addr +} + +// AssertEnvoyRunningWithClient asserts that envoy is running by querying its stats page +func (a *Asserter) AssertEnvoyRunningWithClient(t *testing.T, workload *topology.Workload) { + t.Helper() + client, addr := a.getEnvoyClient(t, workload) + libassert.AssertEnvoyRunningWithClient(t, client, addr) +} + +// AssertEnvoyPresentsCertURIWithClient makes GET request to /certs endpoint and validates that +// two certificates URI is available in the response +func (a *Asserter) AssertEnvoyPresentsCertURIWithClient(t *testing.T, workload *topology.Workload) { + t.Helper() + client, addr := a.getEnvoyClient(t, workload) + libassert.AssertEnvoyPresentsCertURIWithClient(t, client, addr, workload.ID.Name) +} + // HTTPServiceEchoes verifies that a post to the given ip/port combination // returns the data in the response body. Optional path can be provided to // differentiate requests. @@ -217,6 +241,22 @@ func (a *Asserter) fortioFetch2Destination( ) (body []byte, res *http.Response) { t.Helper() + err, res := getFortioFetch2DestinationResponse(t, client, addr, dest, path) + require.NoError(t, err) + defer res.Body.Close() + + // not sure when these happen, suspect it's when the mesh gateway in the peer is not yet ready + require.NotEqual(t, http.StatusServiceUnavailable, res.StatusCode) + require.NotEqual(t, http.StatusGatewayTimeout, res.StatusCode) + // not sure when this happens, suspect it's when envoy hasn't configured the local destination yet + require.NotEqual(t, http.StatusBadRequest, res.StatusCode) + body, err = io.ReadAll(res.Body) + require.NoError(t, err) + + return body, res +} + +func getFortioFetch2DestinationResponse(t testutil.TestingTB, client *http.Client, addr string, dest *topology.Destination, path string) (error, *http.Response) { var actualURL string if dest.Implied { actualURL = fmt.Sprintf("http://%s--%s--%s.virtual.consul:%d/%s", @@ -237,19 +277,9 @@ func (a *Asserter) fortioFetch2Destination( req, err := http.NewRequest(http.MethodPost, url, nil) require.NoError(t, err) - res, err = client.Do(req) + res, err := client.Do(req) require.NoError(t, err) - defer res.Body.Close() - - // not sure when these happen, suspect it's when the mesh gateway in the peer is not yet ready - require.NotEqual(t, http.StatusServiceUnavailable, res.StatusCode) - require.NotEqual(t, http.StatusGatewayTimeout, res.StatusCode) - // not sure when this happens, suspect it's when envoy hasn't configured the local destination yet - require.NotEqual(t, http.StatusBadRequest, res.StatusCode) - body, err = io.ReadAll(res.Body) - require.NoError(t, err) - - return body, res + return err, res } // uses the /fortio/fetch2 endpoint to do a header echo check against an @@ -307,6 +337,26 @@ func (a *Asserter) FortioFetch2FortioName( }) } +// FortioFetch2ServiceUnavailable uses the /fortio/fetch2 endpoint to do a header echo check against an destination +// fortio and asserts that the service is unavailable (503) +func (a *Asserter) FortioFetch2ServiceUnavailable(t *testing.T, fortioWrk *topology.Workload, dest *topology.Destination) { + const kPassphrase = "x-passphrase" + const passphrase = "hello" + path := (fmt.Sprintf("/?header=%s:%s", kPassphrase, passphrase)) + + var ( + node = fortioWrk.Node + addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioWrk.PortOrDefault(dest.PortName)) + client = a.mustGetHTTPClient(t, node.Cluster) + ) + + retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) { + _, res := getFortioFetch2DestinationResponse(r, client, addr, dest, path) + defer res.Body.Close() + require.Equal(r, http.StatusServiceUnavailable, res.StatusCode) + }) +} + // CatalogServiceExists is the same as libassert.CatalogServiceExists, except that it uses // a proxied API client func (a *Asserter) CatalogServiceExists(t *testing.T, cluster string, svc string, opts *api.QueryOptions) { @@ -314,3 +364,18 @@ func (a *Asserter) CatalogServiceExists(t *testing.T, cluster string, svc string cl := a.mustGetAPIClient(t, cluster) libassert.CatalogServiceExists(t, cl, svc, opts) } + +// AssertServiceHealth asserts whether the given service is healthy or not +func (a *Asserter) AssertServiceHealth(t *testing.T, cl *api.Client, serverSVC string, onlypassing bool, count int) { + t.Helper() + retry.RunWith(&retry.Timer{Timeout: time.Second * 20, Wait: time.Millisecond * 500}, t, func(r *retry.R) { + svcs, _, err := cl.Health().Service( + serverSVC, + "", + onlypassing, + nil, + ) + require.NoError(r, err) + require.Equal(r, count, len(svcs)) + }) +} diff --git a/test-integ/upgrade/l7_traffic_management/common.go b/test-integ/upgrade/l7_traffic_management/common.go new file mode 100644 index 0000000000..046890335b --- /dev/null +++ b/test-integ/upgrade/l7_traffic_management/common.go @@ -0,0 +1,231 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package l7_traffic_management + +import ( + "fmt" + "github.com/stretchr/testify/require" + "testing" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" + "github.com/hashicorp/consul/testing/deployer/sprawl" + "github.com/hashicorp/consul/testing/deployer/sprawl/sprawltest" + "github.com/hashicorp/consul/testing/deployer/topology" + + "github.com/hashicorp/consul/test-integ/topoutil" +) + +type commonTopo struct { + Cfg *topology.Config + + Sprawl *sprawl.Sprawl + Assert *topoutil.Asserter + + StaticServerSID topology.ID + StaticClientSID topology.ID + + StaticServerWorkload *topology.Workload + StaticClientWorkload *topology.Workload +} + +const ( + defaultNamespace = "default" + defaultPartition = "default" + dc1 = "dc1" +) + +var ( + staticServerSID = topology.NewID("static-server", defaultNamespace, defaultPartition) + staticClientSID = topology.NewID("static-client", defaultNamespace, defaultPartition) +) + +func NewCommonTopo(t *testing.T) *commonTopo { + t.Helper() + return newCommonTopo(t) +} + +// create below topology +// consul server +// - consul server on node dc1-server1 +// +// dataplane +// - workload(fortio) static-server on node dc1-client1 +// - workload(fortio) static-client on node dc1-client2 with destination to static-server +// - static-client, static-server are registered at 2 agentless nodes. +// +// Intentions +// - static-client has destination to static-server +func newCommonTopo(t *testing.T) *commonTopo { + t.Helper() + + ct := &commonTopo{} + + cfg := &topology.Config{ + Images: topology.Images{ + // ConsulEnterprise: "hashicorp/consul-enterprise:local", + }, + Networks: []*topology.Network{ + {Name: dc1}, + }, + Clusters: []*topology.Cluster{ + { + Name: dc1, + Nodes: []*topology.Node{ + // consul server on dc1-server1 + { + Kind: topology.NodeKindServer, + Images: utils.LatestImages(), + Name: "dc1-server1", + Addresses: []*topology.Address{ + {Network: dc1}, + }, + Meta: map[string]string{ + "build": "0.0.1", + }, + }, + // static-server-v1 on dc1-client1 + { + Kind: topology.NodeKindDataplane, + Name: "dc1-client1", + Workloads: []*topology.Workload{ + { + ID: staticServerSID, + Image: "docker.mirror.hashicorp.services/fortio/fortio", + Port: 8080, + EnvoyAdminPort: 19000, + CheckTCP: "127.0.0.1:8080", + Meta: map[string]string{"version": "v2"}, + Env: []string{ + "FORTIO_NAME=" + dc1 + "::" + staticServerSID.String(), + }, + Command: []string{ + "server", + "-http-port", "8080", + "-redirect-port", "-disabled", + }, + }, + }, + }, + // static-client on dc1-client2 with destination to static-server + { + Kind: topology.NodeKindDataplane, + Name: "dc1-client2", + Workloads: []*topology.Workload{ + { + ID: staticClientSID, + Image: "docker.mirror.hashicorp.services/fortio/fortio", + Port: 8080, + EnvoyAdminPort: 19000, + CheckTCP: "127.0.0.1:8080", + Command: []string{ + "server", + "-http-port", "8080", + "-redirect-port", "-disabled", + }, + Destinations: []*topology.Destination{ + { + ID: staticServerSID, // static-server + LocalPort: 5000, + }, + }, + }, + }, + }, + }, + Enterprise: utils.IsEnterprise(), + InitialConfigEntries: []api.ConfigEntry{ + &api.ProxyConfigEntry{ + Kind: api.ProxyDefaults, + Name: "global", + Partition: topoutil.ConfigEntryPartition("default"), + Config: map[string]any{ + "protocol": "http", + }, + }, + &api.ServiceConfigEntry{ + Kind: api.ServiceDefaults, + Name: staticServerSID.Name, + Partition: topoutil.ConfigEntryPartition("default"), + }, + &api.ServiceIntentionsConfigEntry{ + Kind: api.ServiceIntentions, + Name: staticServerSID.Name, + Partition: topoutil.ConfigEntryPartition("default"), + Sources: []*api.SourceIntention{ + { + Name: staticClientSID.Name, + Action: api.IntentionActionAllow}, + }, + }, + }, + }, + }, + } + + ct.Cfg = cfg + ct.StaticClientSID = staticClientSID + ct.StaticServerSID = staticServerSID + + return ct +} + +func (ct *commonTopo) Launch(t *testing.T) { + t.Helper() + if ct.Sprawl != nil { + t.Fatalf("Launch must only be called once") + } + ct.Sprawl = sprawltest.Launch(t, ct.Cfg) + ct.ValidateWorkloads(t) +} + +// ValidateWorkloads validates below +// - static server, static client workloads are reachable and, static server, static client services are healthy +// - static client and its sidecar exists in catalog +// - envoy is running for static server, static client workloads +// - envoy cert uri is present in for static server, static client workloads +func (ct *commonTopo) ValidateWorkloads(t *testing.T) { + + t.Helper() + ct.Assert = topoutil.NewAsserter(ct.Sprawl) + cluster := ct.Sprawl.Topology().Clusters[dc1] + + cl, err := ct.Sprawl.APIClientForCluster(cluster.Name, "") + require.NoError(t, err) + + staticServerWorkload := cluster.WorkloadByID( + topology.NewNodeID("dc1-client1", defaultPartition), + ct.StaticServerSID, + ) + ct.Assert.HTTPStatus(t, staticServerWorkload, staticServerWorkload.Port, 200) + ct.Assert.AssertServiceHealth(t, cl, ct.StaticServerSID.Name, true, 1) + + staticClientWorkload := cluster.WorkloadByID( + topology.NewNodeID("dc1-client2", defaultPartition), + ct.StaticClientSID, + ) + ct.Assert.AssertServiceHealth(t, cl, ct.StaticClientSID.Name, true, 1) + + // check the service exists in catalog + svcs := cluster.WorkloadsByID(ct.StaticClientSID) + client := svcs[0] + upstream := client.Destinations[0] + ct.Assert.CatalogServiceExists(t, cluster.Name, upstream.ID.Name, utils.CompatQueryOpts(&api.QueryOptions{ + Partition: upstream.ID.Partition, + Namespace: upstream.ID.Namespace, + })) + ct.Assert.CatalogServiceExists(t, cluster.Name, fmt.Sprintf("%s-sidecar-proxy", upstream.ID.Name), utils.CompatQueryOpts(&api.QueryOptions{ + Partition: upstream.ID.Partition, + Namespace: upstream.ID.Namespace, + })) + + ct.StaticServerWorkload = staticServerWorkload + ct.StaticClientWorkload = staticClientWorkload + + ct.Assert.AssertEnvoyRunningWithClient(t, ct.StaticServerWorkload) + ct.Assert.AssertEnvoyRunningWithClient(t, ct.StaticClientWorkload) + + ct.Assert.AssertEnvoyPresentsCertURIWithClient(t, ct.StaticServerWorkload) + ct.Assert.AssertEnvoyPresentsCertURIWithClient(t, ct.StaticClientWorkload) +} diff --git a/test-integ/upgrade/l7_traffic_management/resolver_test.go b/test-integ/upgrade/l7_traffic_management/resolver_test.go new file mode 100644 index 0000000000..0714ee39fa --- /dev/null +++ b/test-integ/upgrade/l7_traffic_management/resolver_test.go @@ -0,0 +1,116 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package l7_traffic_management + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/test/integration/consul-container/libs/utils" + "github.com/hashicorp/consul/testing/deployer/sprawl" + "github.com/hashicorp/consul/testing/deployer/topology" +) + +// TestTrafficManagement_ResolverDefaultSubset_Agentless tests resolver directs traffic to default subset - agentless +// - Create a topology with static-server (meta version V2) and static-client (with static-server upstream) +// - Create a service resolver for static-server with V2 as default subset +// - Resolver directs traffic to the default subset, which is V2 +// - Do a standard upgrade and validate the traffic is still directed to V2 +// - Change the default version in serviceResolver to v1 and the client to server request fails +// since we only have V2 instance +// - Change the default version in serviceResolver to v2 and the client to server request succeeds +// (V2 instance is available and traffic is directed to it) +func TestTrafficManagement_ResolverDefaultSubset_Agentless(t *testing.T) { + t.Parallel() + + ct := NewCommonTopo(t) + ct.Cfg.Clusters[0].InitialConfigEntries = append(ct.Cfg.Clusters[0].InitialConfigEntries, + newServiceResolver(staticServerSID.Name, "v2")) + + ct.Launch(t) + + resolverV2AssertFn := func() { + cluster := ct.Sprawl.Topology().Clusters[dc1] + staticClientWorkload := cluster.WorkloadByID( + topology.NewNodeID("dc1-client2", defaultPartition), + ct.StaticClientSID, + ) + ct.Assert.FortioFetch2HeaderEcho(t, staticClientWorkload, &topology.Destination{ + ID: ct.StaticServerSID, + LocalPort: 5000, + }) + ct.Assert.FortioFetch2FortioName(t, staticClientWorkload, &topology.Destination{ + ID: ct.StaticServerSID, + LocalPort: 5000, + }, dc1, staticServerSID) + ct.Assert.DestinationEndpointStatus(t, staticClientWorkload, "v2.static-server.default", "HEALTHY", 1) + } + resolverV2AssertFn() + + t.Log("Start standard upgrade ...") + sp := ct.Sprawl + cfg := sp.Config() + require.NoError(t, ct.Sprawl.LoadKVDataToCluster("dc1", 1, &api.WriteOptions{})) + require.NoError(t, sp.Upgrade(cfg, "dc1", sprawl.UpgradeTypeStandard, utils.TargetImages(), nil)) + t.Log("Finished standard upgrade ...") + + // verify data is not lost + data, err := ct.Sprawl.GetKV("dc1", "key-0", &api.QueryOptions{}) + require.NoError(t, err) + require.NotNil(t, data) + + ct.ValidateWorkloads(t) + resolverV2AssertFn() + + // Change the default version in serviceResolver to v1 and the client to server request fails + cluster := ct.Sprawl.Topology().Clusters[dc1] + cl, err := ct.Sprawl.APIClientForCluster(cluster.Name, "") + require.NoError(t, err) + configEntry := cl.ConfigEntries() + _, err = configEntry.Delete(api.ServiceResolver, staticServerSID.Name, nil) + require.NoError(t, err) + _, _, err = configEntry.Set(newServiceResolver(staticServerSID.Name, "v1"), nil) + require.NoError(t, err) + ct.ValidateWorkloads(t) + + resolverV1AssertFn := func() { + cluster := ct.Sprawl.Topology().Clusters[dc1] + staticClientWorkload := cluster.WorkloadByID( + topology.NewNodeID("dc1-client2", defaultPartition), + ct.StaticClientSID, + ) + ct.Assert.FortioFetch2ServiceUnavailable(t, staticClientWorkload, &topology.Destination{ + ID: ct.StaticServerSID, + LocalPort: 5000, + }) + } + resolverV1AssertFn() + + // Change the default version in serviceResolver to v2 and the client to server request succeeds + configEntry = cl.ConfigEntries() + _, err = configEntry.Delete(api.ServiceResolver, staticServerSID.Name, nil) + require.NoError(t, err) + _, _, err = configEntry.Set(newServiceResolver(staticServerSID.Name, "v2"), nil) + require.NoError(t, err) + ct.ValidateWorkloads(t) + resolverV2AssertFn() +} + +func newServiceResolver(serviceResolverName string, defaultSubset string) api.ConfigEntry { + return &api.ServiceResolverConfigEntry{ + Kind: api.ServiceResolver, + Name: serviceResolverName, + DefaultSubset: defaultSubset, + Subsets: map[string]api.ServiceResolverSubset{ + "v1": { + Filter: "Service.Meta.version == v1", + }, + "v2": { + Filter: "Service.Meta.version == v2", + }, + }, + } +} diff --git a/test/integration/consul-container/libs/assert/envoy.go b/test/integration/consul-container/libs/assert/envoy.go index 076f2e1af6..0d2929ab90 100644 --- a/test/integration/consul-container/libs/assert/envoy.go +++ b/test/integration/consul-container/libs/assert/envoy.go @@ -250,7 +250,29 @@ func AssertEnvoyPresentsCertURI(t *testing.T, port int, serviceName string) { } require.NotNil(r, dump) }) + validateEnvoyCertificateURI(t, dump, serviceName) +} +func AssertEnvoyPresentsCertURIWithClient(t *testing.T, client *http.Client, addr string, serviceName string) { + var ( + dump string + err error + ) + failer := func() *retry.Timer { + return &retry.Timer{Timeout: 30 * time.Second, Wait: 1 * time.Second} + } + + retry.RunWith(failer(), t, func(r *retry.R) { + dump, _, err = GetEnvoyOutputWithClient(client, addr, "certs", nil) + if err != nil { + r.Fatal("could not fetch envoy configuration") + } + require.NotNil(r, dump) + }) + validateEnvoyCertificateURI(t, dump, serviceName) +} + +func validateEnvoyCertificateURI(t *testing.T, dump string, serviceName string) { // Validate certificate uri filter := `.certificates[] | .cert_chain[].subject_alt_names[].uri` results, err := utils.JQFilter(dump, filter) @@ -283,6 +305,22 @@ func AssertEnvoyRunning(t *testing.T, port int) { }) } +func AssertEnvoyRunningWithClient(t *testing.T, client *http.Client, addr string) { + var ( + err error + ) + failer := func() *retry.Timer { + return &retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond} + } + + retry.RunWith(failer(), t, func(r *retry.R) { + _, _, err = GetEnvoyOutputWithClient(client, addr, "stats", nil) + if err != nil { + r.Fatal("could not fetch envoy stats") + } + }) +} + func GetEnvoyOutput(port int, path string, query map[string]string) (string, int, error) { client := cleanhttp.DefaultClient() return GetEnvoyOutputWithClient(client, fmt.Sprintf("localhost:%d", port), path, query) diff --git a/test/integration/consul-container/test/upgrade/l7_traffic_management/resolver_default_subset_test.go b/test/integration/consul-container/test/upgrade/l7_traffic_management/resolver_default_subset_test.go index 9feb9d8209..5c101bfa2a 100644 --- a/test/integration/consul-container/test/upgrade/l7_traffic_management/resolver_default_subset_test.go +++ b/test/integration/consul-container/test/upgrade/l7_traffic_management/resolver_default_subset_test.go @@ -73,8 +73,8 @@ func TestTrafficManagement_ResolverDefaultSubset(t *testing.T) { assertionFn := func() { _, serverAdminPortV1 := serverConnectProxyV1.GetAdminAddr() _, serverAdminPortV2 := serverConnectProxyV2.GetAdminAddr() - _, adminPort := staticClientProxy.GetAdminAddr() // httpPort - _, port := staticClientProxy.GetAddr() // EnvoyAdminPort + _, adminPort := staticClientProxy.GetAdminAddr() // EnvoyAdminPort + _, port := staticClientProxy.GetAddr() // httpPort libassert.AssertEnvoyRunning(t, serverAdminPortV1) libassert.AssertEnvoyRunning(t, serverAdminPortV2)