mirror of
synced 2025-03-03 14:50:50 +00:00
- Low level plumbing for resources is still retained for now. - Retain "Workload" terminology over "Service". - Revert "Destination" terminology back to "Upstream". - Remove TPROXY support as it only worked for v2.
534 lines
18 KiB
534 lines
18 KiB
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package topoutil
import (
libassert "github.com/hashicorp/consul/test/integration/consul-container/libs/assert"
// Asserter is a utility to help in reducing boilerplate in invoking test
// assertions against consul-topology Sprawl components.
// The methods should largely take in *topology.Service instances in lieu of
// ip/ports if there is only one port that makes sense for the assertion (such
// as use of the envoy admin port 19000).
// If it's up to the test (like picking an upstream) leave port as an argument
// but still take the service and use that to grab the local ip from the
// topology.Node.
type Asserter struct {
sp SprawlLite
// *sprawl.Sprawl satisfies this. We don't need anything else.
type SprawlLite interface {
HTTPClientForCluster(clusterName string) (*http.Client, error)
APIClientForNode(clusterName string, nid topology.NodeID, token string) (*api.Client, error)
APIClientForCluster(clusterName string, token string) (*api.Client, error)
ResourceServiceClientForCluster(clusterName string) pbresource.ResourceServiceClient
Topology() *topology.Topology
// NewAsserter creates a new assertion helper for the provided sprawl.
func NewAsserter(sp SprawlLite) *Asserter {
return &Asserter{
sp: sp,
func (a *Asserter) mustGetHTTPClient(t testutil.TestingTB, cluster string) *http.Client {
client, err := a.httpClientFor(cluster)
require.NoError(t, err)
return client
func (a *Asserter) mustGetAPIClient(t testutil.TestingTB, cluster string) *api.Client {
clu := a.sp.Topology().Clusters[cluster]
cl, err := a.sp.APIClientForCluster(clu.Name, "")
require.NoError(t, err)
return cl
// httpClientFor returns a pre-configured http.Client that proxies requests
// through the embedded squid instance in each LAN.
// Use this in methods below to magically pick the right proxied http client
// given the home of each node being checked.
func (a *Asserter) httpClientFor(cluster string) (*http.Client, error) {
client, err := a.sp.HTTPClientForCluster(cluster)
if err != nil {
return nil, err
return client, nil
// UpstreamEndpointStatus validates that proxy was configured with provided clusterName in the healthStatus
// Exposes libassert.UpstreamEndpointStatus for use against a Sprawl.
// NOTE: this doesn't take a port b/c you always want to use the envoy admin port.
func (a *Asserter) UpstreamEndpointStatus(
t *testing.T,
workload *topology.Workload,
clusterName string,
healthStatus string,
count int,
) {
node := workload.Node
ip := node.LocalAddress()
port := workload.EnvoyAdminPort
addr := fmt.Sprintf("%s:%d", ip, port)
client := a.mustGetHTTPClient(t, node.Cluster)
libassert.AssertUpstreamEndpointStatusWithClient(t, client, addr, clusterName, healthStatus, count)
func (a *Asserter) getEnvoyClient(t *testing.T, workload *topology.Workload) (client *http.Client, addr string) {
node := workload.Node
ip := node.LocalAddress()
port := workload.EnvoyAdminPort
addr = fmt.Sprintf("%s:%d", ip, port)
client = a.mustGetHTTPClient(t, node.Cluster)
return client, addr
// AssertEnvoyRunningWithClient asserts that envoy is running by querying its stats page
func (a *Asserter) AssertEnvoyRunningWithClient(t *testing.T, workload *topology.Workload) {
client, addr := a.getEnvoyClient(t, workload)
libassert.AssertEnvoyRunningWithClient(t, client, addr)
// AssertEnvoyPresentsCertURIWithClient makes GET request to /certs endpoint and validates that
// two certificates URI is available in the response
func (a *Asserter) AssertEnvoyPresentsCertURIWithClient(t *testing.T, workload *topology.Workload) {
client, addr := a.getEnvoyClient(t, workload)
libassert.AssertEnvoyPresentsCertURIWithClient(t, client, addr, workload.ID.Name)
func (a *Asserter) AssertEnvoyHTTPrbacFiltersContainIntentions(t *testing.T, workload *topology.Workload) {
client, addr := a.getEnvoyClient(t, workload)
var (
dump string
err error
failer := func() *retry.Timer {
return &retry.Timer{Timeout: 30 * time.Second, Wait: 1 * time.Second}
retry.RunWith(failer(), t, func(r *retry.R) {
dump, _, err = libassert.GetEnvoyOutputWithClient(client, addr, "config_dump", map[string]string{})
if err != nil {
r.Fatal("could not fetch envoy configuration")
// the steps below validate that the json result from envoy config dump configured active listeners with rbac and http filters
filter := `.configs[2].dynamic_listeners[].active_state.listener | "\(.name) \( .filter_chains[].filters[] | select(.name == "envoy.filters.network.http_connection_manager") | .typed_config.http_filters | map(.name) | join(","))"`
errorStateFilter := `.configs[2].dynamic_listeners[].error_state"`
configErrorStates, _ := utils.JQFilter(dump, errorStateFilter)
require.Nil(t, configErrorStates, "there should not be any error states on listener configuration")
results, err := utils.JQFilter(dump, filter)
require.NoError(t, err, "could not parse envoy configuration")
var filteredResult []string
for _, result := range results {
parts := strings.Split(strings.ReplaceAll(result, `,`, " "), " ")
sanitizedResult := append(parts[:0], parts[1:]...)
filteredResult = append(filteredResult, sanitizedResult...)
require.Contains(t, filteredResult, "envoy.filters.http.rbac")
require.Contains(t, filteredResult, "envoy.filters.http.router")
require.Contains(t, dump, "intentions")
// HTTPServiceEchoes verifies that a post to the given ip/port combination
// returns the data in the response body. Optional path can be provided to
// differentiate requests.
// Exposes libassert.HTTPServiceEchoes for use against a Sprawl.
// NOTE: this takes a port b/c you may want to reach this via your choice of upstream.
func (a *Asserter) HTTPServiceEchoes(
t *testing.T,
workload *topology.Workload,
port int,
path string,
) {
require.True(t, port > 0)
node := workload.Node
ip := node.LocalAddress()
addr := fmt.Sprintf("%s:%d", ip, port)
client := a.mustGetHTTPClient(t, node.Cluster)
libassert.HTTPServiceEchoesWithClient(t, client, addr, path)
// HTTPServiceEchoesResHeader verifies that a post to the given ip/port combination
// returns the data in the response body with expected response headers.
// Optional path can be provided to differentiate requests.
// Exposes libassert.HTTPServiceEchoes for use against a Sprawl.
// NOTE: this takes a port b/c you may want to reach this via your choice of upstream.
func (a *Asserter) HTTPServiceEchoesResHeader(
t *testing.T,
workload *topology.Workload,
port int,
path string,
expectedResHeader map[string]string,
) {
require.True(t, port > 0)
node := workload.Node
ip := node.LocalAddress()
addr := fmt.Sprintf("%s:%d", ip, port)
client := a.mustGetHTTPClient(t, node.Cluster)
libassert.HTTPServiceEchoesResHeaderWithClient(t, client, addr, path, expectedResHeader)
func (a *Asserter) HTTPStatus(
t *testing.T,
workload *topology.Workload,
port int,
status int,
) {
require.True(t, port > 0)
node := workload.Node
ip := node.LocalAddress()
addr := fmt.Sprintf("%s:%d", ip, port)
client := a.mustGetHTTPClient(t, node.Cluster)
url := "http://" + addr
retry.RunWith(&retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
resp, err := client.Get(url)
if err != nil {
r.Fatalf("could not make request to %q: %v", url, err)
defer resp.Body.Close()
if resp.StatusCode != status {
r.Fatalf("expected status %d, got %d", status, resp.StatusCode)
// asserts that the service sid in cluster and exported by peer localPeerName is passing health checks,
func (a *Asserter) HealthyWithPeer(t *testing.T, cluster string, sid topology.ID, peerName string) {
cl := a.mustGetAPIClient(t, cluster)
retry.RunWith(&retry.Timer{Timeout: time.Minute * 1, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
svcs, _, err := cl.Health().Service(
Partition: sid.Partition,
Namespace: sid.Namespace,
Peer: peerName,
require.NoError(r, err)
assert.GreaterOrEqual(r, len(svcs), 1)
type testingT interface {
// does a fortio /fetch2 to the given fortio service, targetting the given upstream. Returns
// the body, and response with response.Body already Closed.
// We treat 400, 503, and 504s as retryable errors
func (a *Asserter) fortioFetch2Upstream(
t testutil.TestingTB,
client *http.Client,
addr string,
us *topology.Upstream,
path string,
) (body []byte, res *http.Response) {
err, res := getFortioFetch2UpstreamResponse(t, client, addr, us, path, nil)
require.NoError(t, err)
defer res.Body.Close()
// not sure when these happen, suspect it's when the mesh gateway in the peer is not yet ready
require.NotEqual(t, http.StatusServiceUnavailable, res.StatusCode)
require.NotEqual(t, http.StatusGatewayTimeout, res.StatusCode)
// not sure when this happens, suspect it's when envoy hasn't configured the local upstream yet
require.NotEqual(t, http.StatusBadRequest, res.StatusCode)
body, err = io.ReadAll(res.Body)
require.NoError(t, err)
return body, res
func getFortioFetch2UpstreamResponse(t testutil.TestingTB, client *http.Client, addr string, us *topology.Upstream, path string, headers map[string]string) (error, *http.Response) {
actualURL := fmt.Sprintf("http://localhost:%d/%s", us.LocalPort, path)
url := fmt.Sprintf("http://%s/fortio/fetch2?url=%s", addr,
req, err := http.NewRequest(http.MethodPost, url, nil)
require.NoError(t, err)
for h, v := range headers {
req.Header.Set(h, v)
res, err := client.Do(req)
require.NoError(t, err)
return err, res
// uses the /fortio/fetch2 endpoint to do a header echo check against an
// upstream fortio
func (a *Asserter) FortioFetch2HeaderEcho(t *testing.T, fortioWrk *topology.Workload, us *topology.Upstream) {
const kPassphrase = "x-passphrase"
const passphrase = "hello"
path := (fmt.Sprintf("/?header=%s:%s", kPassphrase, passphrase))
var (
node = fortioWrk.Node
addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioWrk.Port)
client = a.mustGetHTTPClient(t, node.Cluster)
retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
_, res := a.fortioFetch2Upstream(r, client, addr, us, path)
require.Equal(r, http.StatusOK, res.StatusCode)
v := res.Header.Get(kPassphrase)
require.Equal(r, passphrase, v)
// similar to libassert.AssertFortioName,
// uses the /fortio/fetch2 endpoint to hit the debug endpoint on the upstream,
// and assert that the FORTIO_NAME == name
func (a *Asserter) FortioFetch2FortioName(
t *testing.T,
fortioWrk *topology.Workload,
us *topology.Upstream,
clusterName string,
sid topology.ID,
) {
var (
node = fortioWrk.Node
addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioWrk.Port)
client = a.mustGetHTTPClient(t, node.Cluster)
var fortioNameRE = regexp.MustCompile(("\nFORTIO_NAME=(.+)\n"))
path := "/debug?env=dump"
retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
body, res := a.fortioFetch2Upstream(r, client, addr, us, path)
require.Equal(r, http.StatusOK, res.StatusCode)
// TODO: not sure we should retry these?
m := fortioNameRE.FindStringSubmatch(string(body))
require.GreaterOrEqual(r, len(m), 2)
// TODO: dedupe from NewFortioService
require.Equal(r, fmt.Sprintf("%s::%s", clusterName, sid.String()), m[1])
func (a *Asserter) FortioFetch2ServiceUnavailable(t *testing.T, fortioWrk *topology.Workload, us *topology.Upstream) {
const kPassphrase = "x-passphrase"
const passphrase = "hello"
path := (fmt.Sprintf("/?header=%s:%s", kPassphrase, passphrase))
a.FortioFetch2ServiceStatusCodes(t, fortioWrk, us, path, nil, []int{http.StatusServiceUnavailable})
// FortioFetch2ServiceStatusCodes uses the /fortio/fetch2 endpoint to do a header echo check against a upstream
// fortio and asserts that the returned status code matches the desired one(s)
func (a *Asserter) FortioFetch2ServiceStatusCodes(t *testing.T, fortioWrk *topology.Workload, us *topology.Upstream, path string, headers map[string]string, statuses []int) {
var (
node = fortioWrk.Node
addr = fmt.Sprintf("%s:%d", node.LocalAddress(), fortioWrk.Port)
client = a.mustGetHTTPClient(t, node.Cluster)
retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
_, res := getFortioFetch2UpstreamResponse(r, client, addr, us, path, headers)
defer res.Body.Close()
require.Contains(r, statuses, res.StatusCode)
// CatalogServiceExists is the same as libassert.CatalogServiceExists, except that it uses
// a proxied API client
func (a *Asserter) CatalogServiceExists(t *testing.T, cluster string, svc string, opts *api.QueryOptions) {
cl := a.mustGetAPIClient(t, cluster)
libassert.CatalogServiceExists(t, cl, svc, opts)
// HealthServiceEntries asserts the service has the expected number of instances and checks
func (a *Asserter) HealthServiceEntries(t *testing.T, cluster string, node *topology.Node, svc string, passingOnly bool, opts *api.QueryOptions, expectedInstance int, expectedChecks int) []*api.ServiceEntry {
cl, err := a.sp.APIClientForNode(cluster, node.ID(), "")
require.NoError(t, err)
health := cl.Health()
var serviceEntries []*api.ServiceEntry
retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
serviceEntries, _, err = health.Service(svc, "", passingOnly, opts)
require.NoError(r, err)
require.Equalf(r, expectedInstance, len(serviceEntries), "dc: %s, service: %s", cluster, serviceEntries[0].Service.Service)
require.Equalf(r, expectedChecks, len(serviceEntries[0].Checks), "dc: %s, service: %s", cluster, serviceEntries[0].Service.Service)
return serviceEntries
// TokenExist asserts the token exists in the cluster and identical to the expected token
func (a *Asserter) TokenExist(t *testing.T, cluster string, node *topology.Node, expectedToken *api.ACLToken) {
cl, err := a.sp.APIClientForNode(cluster, node.ID(), "")
require.NoError(t, err)
acl := cl.ACL()
retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 500}, t, func(r *retry.R) {
retrievedToken, _, err := acl.TokenRead(expectedToken.AccessorID, &api.QueryOptions{})
require.NoError(r, err)
require.True(r, cmp.Equal(expectedToken, retrievedToken), "token %s", expectedToken.Description)
// AutopilotHealth asserts the autopilot health endpoint return expected state
func (a *Asserter) AutopilotHealth(t *testing.T, cluster *topology.Cluster, leaderName string, expectedHealthy bool) {
retry.RunWith(&retry.Timer{Timeout: 60 * time.Second, Wait: time.Millisecond * 10}, t, func(r *retry.R) {
var out *api.OperatorHealthReply
for _, node := range cluster.Nodes {
if node.Name == leaderName {
client, err := a.sp.APIClientForNode(cluster.Name, node.ID(), "")
if err != nil {
r.Log("err at node", node.Name, err)
operator := client.Operator()
out, err = operator.AutopilotServerHealth(&api.QueryOptions{})
if err != nil {
r.Log("err at node", node.Name, err)
// Got the Autopilot server health response, break the loop
r.Log("Got response at node", node.Name)
r.Log("out", out, "health", out.Healthy)
require.Equal(r, expectedHealthy, out.Healthy)
type AuditEntry struct {
CreatedAt time.Time `json:"created_at"`
EventType string `json:"event_type"`
Payload Payload `json:"payload"`
type Payload struct {
ID string `json:"id"`
Version string `json:"version"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Auth Auth `json:"auth"`
Request Request `json:"request"`
Response Response `json:"response,omitempty"` // Response is not present in OperationStart log
Stage string `json:"stage"`
type Auth struct {
AccessorID string `json:"accessor_id"`
Description string `json:"description"`
CreateTime time.Time `json:"create_time"`
type Request struct {
Operation string `json:"operation"`
Endpoint string `json:"endpoint"`
RemoteAddr string `json:"remote_addr"`
UserAgent string `json:"user_agent"`
Host string `json:"host"`
QueryParams map[string]string `json:"query_params,omitempty"` // QueryParams might not be present in all cases
type Response struct {
Status string `json:"status"`
Error string `json:"error,omitempty"` // Error field is optional,shows up only with the status is non 2xx
func (a *Asserter) ValidateHealthEndpointAuditLog(t *testing.T, filePath string) {
boolIsErrorJSON := false
jsonData, err := os.ReadFile(filePath)
require.NoError(t, err)
// Print details of each AuditEntry
var entry AuditEntry
events := strings.Split(strings.TrimSpace(string(jsonData)), "\n")
// function to validate the error object when the health endpoint is returning 429
isErrorJSONObject := func(errorString string) error {
// Attempt to unmarshal the error string into a map[string]interface{}
var m map[string]interface{}
err := json.Unmarshal([]byte(errorString), &m)
return err
for _, event := range events {
if strings.Contains(event, "/v1/operator/autopilot/health") && strings.Contains(event, "OperationComplete") {
err = json.Unmarshal([]byte(event), &entry)
require.NoError(t, err, "Error unmarshalling JSON: %v\", err")
if entry.Payload.Response.Status == "429" {
boolIsErrorJSON = true
errResponse := entry.Payload.Response.Error
err = isErrorJSONObject(errResponse)
require.NoError(t, err, "Autopilot Health endpoint error response in the audit log is in unexpected format: %v\", err")
require.Equal(t, boolIsErrorJSON, true, "Unable to verify audit log health endpoint error message")