NET-6813: adding resolver default subset test in agentless upgrade test (#20046)

This commit is contained in:
Manoj Srinivasamurthy 2024-01-10 21:25:50 +05:30 committed by GitHub
parent 7724bb88d5
commit fa6eb61f70
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 464 additions and 14 deletions

View File

@ -101,6 +101,30 @@ func (a *Asserter) DestinationEndpointStatus(
libassert.AssertUpstreamEndpointStatusWithClient(t, client, addr, clusterName, healthStatus, count)
}
func (a *Asserter) getEnvoyClient(t *testing.T, workload *topology.Workload) (client *http.Client, addr string) {
node := workload.Node
ip := node.LocalAddress()
port := workload.EnvoyAdminPort
addr = fmt.Sprintf("%s:%d", ip, port)
client = a.mustGetHTTPClient(t, node.Cluster)
return client, addr
}
// AssertEnvoyRunningWithClient asserts that envoy is running by querying its stats page
func (a *Asserter) AssertEnvoyRunningWithClient(t *testing.T, workload *topology.Workload) {
t.Helper()
client, addr := a.getEnvoyClient(t, workload)
libassert.AssertEnvoyRunningWithClient(t, client, addr)
}
// AssertEnvoyPresentsCertURIWithClient makes GET request to /certs endpoint and validates that
// two certificates URI is available in the response
func (a *Asserter) AssertEnvoyPresentsCertURIWithClient(t *testing.T, workload *topology.Workload) {
t.Helper()
client, addr := a.getEnvoyClient(t, workload)
libassert.AssertEnvoyPresentsCertURIWithClient(t, client, addr, workload.ID.Name)
}
// HTTPServiceEchoes verifies that a post to the given ip/port combination
// returns the data in the response body. Optional path can be provided to
// differentiate requests.
@ -217,6 +241,22 @@ func (a *Asserter) fortioFetch2Destination(
) (body []byte, res *http.Response) {
t.Helper()
err, res := getFortioFetch2DestinationResponse(t, client, addr, dest, path)
require.NoError(t, err)
defer res.Body.Close()
// not sure when these happen, suspect it's when the mesh gateway in the peer is not yet ready
require.NotEqual(t, http.StatusServiceUnavailable, res.StatusCode)
require.NotEqual(t, http.StatusGatewayTimeout, res.StatusCode)
// not sure when this happens, suspect it's when envoy hasn't configured the local destination yet
require.NotEqual(t, http.StatusBadRequest, res.StatusCode)
body, err = io.ReadAll(res.Body)
require.NoError(t, err)
return body, res
}
func getFortioFetch2DestinationResponse(t testutil.TestingTB, client *http.Client, addr string, dest *topology.Destination, path string) (error, *http.Response) {
var actualURL string
if dest.Implied {
actualURL = fmt.Sprintf("http://%s--%s--%s.virtual.consul:%d/%s",
@ -237,19 +277,9 @@ func (a *Asserter) fortioFetch2Destination(
req, err := http.NewRequest(http.MethodPost, url, nil)
require.NoError(t, err)
res, err = client.Do(req)
res, err := client.Do(req)
require.NoError(t, err)
defer res.Body.Close()
// not sure when these happen, suspect it's when the mesh gateway in the peer is not yet ready
require.NotEqual(t, http.StatusServiceUnavailable, res.StatusCode)
require.NotEqual(t, http.StatusGatewayTimeout, res.StatusCode)
// not sure when this happens, suspect it's when envoy hasn't configured the local destination yet
require.NotEqual(t, http.StatusBadRequest, res.StatusCode)
body, err = io.ReadAll(res.Body)
require.NoError(t, err)
return body, res
return err, res
}
// uses the /fortio/fetch2 endpoint to do a header echo check against an
@ -307,6 +337,26 @@ func (a *Asserter) FortioFetch2FortioName(
})
}
// FortioFetch2ServiceUnavailable uses the /fortio/fetch2 endpoint to do a header echo check against an destination
// fortio and asserts that the service is unavailable (503)
func (a *Asserter) FortioFetch2ServiceUnavailable(t *testing.T, fortioWrk *topology.Workload, dest *topology.Destination) {
const kPassphrase = "x-passphrase"
const passphrase = "hello"
path := (fmt.Sprintf("/?header=%s:%s", kPassphrase, passphrase))
var (
node = fortioWrk.Node
addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioWrk.PortOrDefault(dest.PortName))
client = a.mustGetHTTPClient(t, node.Cluster)
)
retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
_, res := getFortioFetch2DestinationResponse(r, client, addr, dest, path)
defer res.Body.Close()
require.Equal(r, http.StatusServiceUnavailable, res.StatusCode)
})
}
// CatalogServiceExists is the same as libassert.CatalogServiceExists, except that it uses
// a proxied API client
func (a *Asserter) CatalogServiceExists(t *testing.T, cluster string, svc string, opts *api.QueryOptions) {
@ -314,3 +364,18 @@ func (a *Asserter) CatalogServiceExists(t *testing.T, cluster string, svc string
cl := a.mustGetAPIClient(t, cluster)
libassert.CatalogServiceExists(t, cl, svc, opts)
}
// AssertServiceHealth asserts whether the given service is healthy or not
func (a *Asserter) AssertServiceHealth(t *testing.T, cl *api.Client, serverSVC string, onlypassing bool, count int) {
t.Helper()
retry.RunWith(&retry.Timer{Timeout: time.Second * 20, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
svcs, _, err := cl.Health().Service(
serverSVC,
"",
onlypassing,
nil,
)
require.NoError(r, err)
require.Equal(r, count, len(svcs))
})
}

View File

@ -0,0 +1,231 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package l7_traffic_management
import (
"fmt"
"github.com/stretchr/testify/require"
"testing"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
"github.com/hashicorp/consul/testing/deployer/sprawl"
"github.com/hashicorp/consul/testing/deployer/sprawl/sprawltest"
"github.com/hashicorp/consul/testing/deployer/topology"
"github.com/hashicorp/consul/test-integ/topoutil"
)
type commonTopo struct {
Cfg *topology.Config
Sprawl *sprawl.Sprawl
Assert *topoutil.Asserter
StaticServerSID topology.ID
StaticClientSID topology.ID
StaticServerWorkload *topology.Workload
StaticClientWorkload *topology.Workload
}
const (
defaultNamespace = "default"
defaultPartition = "default"
dc1 = "dc1"
)
var (
staticServerSID = topology.NewID("static-server", defaultNamespace, defaultPartition)
staticClientSID = topology.NewID("static-client", defaultNamespace, defaultPartition)
)
func NewCommonTopo(t *testing.T) *commonTopo {
t.Helper()
return newCommonTopo(t)
}
// create below topology
// consul server
// - consul server on node dc1-server1
//
// dataplane
// - workload(fortio) static-server on node dc1-client1
// - workload(fortio) static-client on node dc1-client2 with destination to static-server
// - static-client, static-server are registered at 2 agentless nodes.
//
// Intentions
// - static-client has destination to static-server
func newCommonTopo(t *testing.T) *commonTopo {
t.Helper()
ct := &commonTopo{}
cfg := &topology.Config{
Images: topology.Images{
// ConsulEnterprise: "hashicorp/consul-enterprise:local",
},
Networks: []*topology.Network{
{Name: dc1},
},
Clusters: []*topology.Cluster{
{
Name: dc1,
Nodes: []*topology.Node{
// consul server on dc1-server1
{
Kind: topology.NodeKindServer,
Images: utils.LatestImages(),
Name: "dc1-server1",
Addresses: []*topology.Address{
{Network: dc1},
},
Meta: map[string]string{
"build": "0.0.1",
},
},
// static-server-v1 on dc1-client1
{
Kind: topology.NodeKindDataplane,
Name: "dc1-client1",
Workloads: []*topology.Workload{
{
ID: staticServerSID,
Image: "docker.mirror.hashicorp.services/fortio/fortio",
Port: 8080,
EnvoyAdminPort: 19000,
CheckTCP: "127.0.0.1:8080",
Meta: map[string]string{"version": "v2"},
Env: []string{
"FORTIO_NAME=" + dc1 + "::" + staticServerSID.String(),
},
Command: []string{
"server",
"-http-port", "8080",
"-redirect-port", "-disabled",
},
},
},
},
// static-client on dc1-client2 with destination to static-server
{
Kind: topology.NodeKindDataplane,
Name: "dc1-client2",
Workloads: []*topology.Workload{
{
ID: staticClientSID,
Image: "docker.mirror.hashicorp.services/fortio/fortio",
Port: 8080,
EnvoyAdminPort: 19000,
CheckTCP: "127.0.0.1:8080",
Command: []string{
"server",
"-http-port", "8080",
"-redirect-port", "-disabled",
},
Destinations: []*topology.Destination{
{
ID: staticServerSID, // static-server
LocalPort: 5000,
},
},
},
},
},
},
Enterprise: utils.IsEnterprise(),
InitialConfigEntries: []api.ConfigEntry{
&api.ProxyConfigEntry{
Kind: api.ProxyDefaults,
Name: "global",
Partition: topoutil.ConfigEntryPartition("default"),
Config: map[string]any{
"protocol": "http",
},
},
&api.ServiceConfigEntry{
Kind: api.ServiceDefaults,
Name: staticServerSID.Name,
Partition: topoutil.ConfigEntryPartition("default"),
},
&api.ServiceIntentionsConfigEntry{
Kind: api.ServiceIntentions,
Name: staticServerSID.Name,
Partition: topoutil.ConfigEntryPartition("default"),
Sources: []*api.SourceIntention{
{
Name: staticClientSID.Name,
Action: api.IntentionActionAllow},
},
},
},
},
},
}
ct.Cfg = cfg
ct.StaticClientSID = staticClientSID
ct.StaticServerSID = staticServerSID
return ct
}
func (ct *commonTopo) Launch(t *testing.T) {
t.Helper()
if ct.Sprawl != nil {
t.Fatalf("Launch must only be called once")
}
ct.Sprawl = sprawltest.Launch(t, ct.Cfg)
ct.ValidateWorkloads(t)
}
// ValidateWorkloads validates below
// - static server, static client workloads are reachable and, static server, static client services are healthy
// - static client and its sidecar exists in catalog
// - envoy is running for static server, static client workloads
// - envoy cert uri is present in for static server, static client workloads
func (ct *commonTopo) ValidateWorkloads(t *testing.T) {
t.Helper()
ct.Assert = topoutil.NewAsserter(ct.Sprawl)
cluster := ct.Sprawl.Topology().Clusters[dc1]
cl, err := ct.Sprawl.APIClientForCluster(cluster.Name, "")
require.NoError(t, err)
staticServerWorkload := cluster.WorkloadByID(
topology.NewNodeID("dc1-client1", defaultPartition),
ct.StaticServerSID,
)
ct.Assert.HTTPStatus(t, staticServerWorkload, staticServerWorkload.Port, 200)
ct.Assert.AssertServiceHealth(t, cl, ct.StaticServerSID.Name, true, 1)
staticClientWorkload := cluster.WorkloadByID(
topology.NewNodeID("dc1-client2", defaultPartition),
ct.StaticClientSID,
)
ct.Assert.AssertServiceHealth(t, cl, ct.StaticClientSID.Name, true, 1)
// check the service exists in catalog
svcs := cluster.WorkloadsByID(ct.StaticClientSID)
client := svcs[0]
upstream := client.Destinations[0]
ct.Assert.CatalogServiceExists(t, cluster.Name, upstream.ID.Name, utils.CompatQueryOpts(&api.QueryOptions{
Partition: upstream.ID.Partition,
Namespace: upstream.ID.Namespace,
}))
ct.Assert.CatalogServiceExists(t, cluster.Name, fmt.Sprintf("%s-sidecar-proxy", upstream.ID.Name), utils.CompatQueryOpts(&api.QueryOptions{
Partition: upstream.ID.Partition,
Namespace: upstream.ID.Namespace,
}))
ct.StaticServerWorkload = staticServerWorkload
ct.StaticClientWorkload = staticClientWorkload
ct.Assert.AssertEnvoyRunningWithClient(t, ct.StaticServerWorkload)
ct.Assert.AssertEnvoyRunningWithClient(t, ct.StaticClientWorkload)
ct.Assert.AssertEnvoyPresentsCertURIWithClient(t, ct.StaticServerWorkload)
ct.Assert.AssertEnvoyPresentsCertURIWithClient(t, ct.StaticClientWorkload)
}

View File

@ -0,0 +1,116 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package l7_traffic_management
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/test/integration/consul-container/libs/utils"
"github.com/hashicorp/consul/testing/deployer/sprawl"
"github.com/hashicorp/consul/testing/deployer/topology"
)
// TestTrafficManagement_ResolverDefaultSubset_Agentless tests resolver directs traffic to default subset - agentless
// - Create a topology with static-server (meta version V2) and static-client (with static-server upstream)
// - Create a service resolver for static-server with V2 as default subset
// - Resolver directs traffic to the default subset, which is V2
// - Do a standard upgrade and validate the traffic is still directed to V2
// - Change the default version in serviceResolver to v1 and the client to server request fails
// since we only have V2 instance
// - Change the default version in serviceResolver to v2 and the client to server request succeeds
// (V2 instance is available and traffic is directed to it)
func TestTrafficManagement_ResolverDefaultSubset_Agentless(t *testing.T) {
t.Parallel()
ct := NewCommonTopo(t)
ct.Cfg.Clusters[0].InitialConfigEntries = append(ct.Cfg.Clusters[0].InitialConfigEntries,
newServiceResolver(staticServerSID.Name, "v2"))
ct.Launch(t)
resolverV2AssertFn := func() {
cluster := ct.Sprawl.Topology().Clusters[dc1]
staticClientWorkload := cluster.WorkloadByID(
topology.NewNodeID("dc1-client2", defaultPartition),
ct.StaticClientSID,
)
ct.Assert.FortioFetch2HeaderEcho(t, staticClientWorkload, &topology.Destination{
ID: ct.StaticServerSID,
LocalPort: 5000,
})
ct.Assert.FortioFetch2FortioName(t, staticClientWorkload, &topology.Destination{
ID: ct.StaticServerSID,
LocalPort: 5000,
}, dc1, staticServerSID)
ct.Assert.DestinationEndpointStatus(t, staticClientWorkload, "v2.static-server.default", "HEALTHY", 1)
}
resolverV2AssertFn()
t.Log("Start standard upgrade ...")
sp := ct.Sprawl
cfg := sp.Config()
require.NoError(t, ct.Sprawl.LoadKVDataToCluster("dc1", 1, &api.WriteOptions{}))
require.NoError(t, sp.Upgrade(cfg, "dc1", sprawl.UpgradeTypeStandard, utils.TargetImages(), nil))
t.Log("Finished standard upgrade ...")
// verify data is not lost
data, err := ct.Sprawl.GetKV("dc1", "key-0", &api.QueryOptions{})
require.NoError(t, err)
require.NotNil(t, data)
ct.ValidateWorkloads(t)
resolverV2AssertFn()
// Change the default version in serviceResolver to v1 and the client to server request fails
cluster := ct.Sprawl.Topology().Clusters[dc1]
cl, err := ct.Sprawl.APIClientForCluster(cluster.Name, "")
require.NoError(t, err)
configEntry := cl.ConfigEntries()
_, err = configEntry.Delete(api.ServiceResolver, staticServerSID.Name, nil)
require.NoError(t, err)
_, _, err = configEntry.Set(newServiceResolver(staticServerSID.Name, "v1"), nil)
require.NoError(t, err)
ct.ValidateWorkloads(t)
resolverV1AssertFn := func() {
cluster := ct.Sprawl.Topology().Clusters[dc1]
staticClientWorkload := cluster.WorkloadByID(
topology.NewNodeID("dc1-client2", defaultPartition),
ct.StaticClientSID,
)
ct.Assert.FortioFetch2ServiceUnavailable(t, staticClientWorkload, &topology.Destination{
ID: ct.StaticServerSID,
LocalPort: 5000,
})
}
resolverV1AssertFn()
// Change the default version in serviceResolver to v2 and the client to server request succeeds
configEntry = cl.ConfigEntries()
_, err = configEntry.Delete(api.ServiceResolver, staticServerSID.Name, nil)
require.NoError(t, err)
_, _, err = configEntry.Set(newServiceResolver(staticServerSID.Name, "v2"), nil)
require.NoError(t, err)
ct.ValidateWorkloads(t)
resolverV2AssertFn()
}
func newServiceResolver(serviceResolverName string, defaultSubset string) api.ConfigEntry {
return &api.ServiceResolverConfigEntry{
Kind: api.ServiceResolver,
Name: serviceResolverName,
DefaultSubset: defaultSubset,
Subsets: map[string]api.ServiceResolverSubset{
"v1": {
Filter: "Service.Meta.version == v1",
},
"v2": {
Filter: "Service.Meta.version == v2",
},
},
}
}

View File

@ -250,7 +250,29 @@ func AssertEnvoyPresentsCertURI(t *testing.T, port int, serviceName string) {
}
require.NotNil(r, dump)
})
validateEnvoyCertificateURI(t, dump, serviceName)
}
func AssertEnvoyPresentsCertURIWithClient(t *testing.T, client *http.Client, addr string, serviceName string) {
var (
dump string
err error
)
failer := func() *retry.Timer {
return &retry.Timer{Timeout: 30 * time.Second, Wait: 1 * time.Second}
}
retry.RunWith(failer(), t, func(r *retry.R) {
dump, _, err = GetEnvoyOutputWithClient(client, addr, "certs", nil)
if err != nil {
r.Fatal("could not fetch envoy configuration")
}
require.NotNil(r, dump)
})
validateEnvoyCertificateURI(t, dump, serviceName)
}
func validateEnvoyCertificateURI(t *testing.T, dump string, serviceName string) {
// Validate certificate uri
filter := `.certificates[] | .cert_chain[].subject_alt_names[].uri`
results, err := utils.JQFilter(dump, filter)
@ -283,6 +305,22 @@ func AssertEnvoyRunning(t *testing.T, port int) {
})
}
func AssertEnvoyRunningWithClient(t *testing.T, client *http.Client, addr string) {
var (
err error
)
failer := func() *retry.Timer {
return &retry.Timer{Timeout: 10 * time.Second, Wait: 500 * time.Millisecond}
}
retry.RunWith(failer(), t, func(r *retry.R) {
_, _, err = GetEnvoyOutputWithClient(client, addr, "stats", nil)
if err != nil {
r.Fatal("could not fetch envoy stats")
}
})
}
func GetEnvoyOutput(port int, path string, query map[string]string) (string, int, error) {
client := cleanhttp.DefaultClient()
return GetEnvoyOutputWithClient(client, fmt.Sprintf("localhost:%d", port), path, query)

View File

@ -73,8 +73,8 @@ func TestTrafficManagement_ResolverDefaultSubset(t *testing.T) {
assertionFn := func() {
_, serverAdminPortV1 := serverConnectProxyV1.GetAdminAddr()
_, serverAdminPortV2 := serverConnectProxyV2.GetAdminAddr()
_, adminPort := staticClientProxy.GetAdminAddr() // httpPort
_, port := staticClientProxy.GetAddr() // EnvoyAdminPort
_, adminPort := staticClientProxy.GetAdminAddr() // EnvoyAdminPort
_, port := staticClientProxy.GetAddr() // httpPort
libassert.AssertEnvoyRunning(t, serverAdminPortV1)
libassert.AssertEnvoyRunning(t, serverAdminPortV2)