consul/test-integ/topoutil/asserter_blankspace.go

304 lines
8.9 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package topoutil
import (
"context"
"fmt"
"testing"
"time"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testing/deployer/topology"
"github.com/stretchr/testify/require"
)
// CheckBlankspaceNameViaHTTP calls a copy of blankspace and asserts it arrived
// on the correct instance using HTTP1 or HTTP2.
func (a *Asserter) CheckBlankspaceNameViaHTTP(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
useHTTP2 bool,
path string,
clusterName string,
sid topology.ServiceID,
) {
t.Helper()
a.checkBlankspaceNameViaHTTPWithCallback(t, service, upstream, useHTTP2, path, 1, func(_ *retry.R) {}, func(r *retry.R, remoteName string) {
require.Equal(r, fmt.Sprintf("%s::%s", clusterName, sid.String()), remoteName)
}, func(r *retry.R) {})
}
// CheckBlankspaceNameTrafficSplitViaHTTP is like CheckBlankspaceNameViaHTTP
// but it is verifying a relative traffic split.
func (a *Asserter) CheckBlankspaceNameTrafficSplitViaHTTP(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
useHTTP2 bool,
path string,
expect map[string]int,
) {
t.Helper()
got := make(map[string]int)
a.checkBlankspaceNameViaHTTPWithCallback(t, service, upstream, useHTTP2, path, 100, func(_ *retry.R) {
got = make(map[string]int)
}, func(_ *retry.R, name string) {
got[name]++
}, func(r *retry.R) {
assertTrafficSplitFor100Requests(r, got, expect)
})
}
func (a *Asserter) checkBlankspaceNameViaHTTPWithCallback(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
useHTTP2 bool,
path string,
count int,
resetFn func(r *retry.R),
attemptFn func(r *retry.R, remoteName string),
checkFn func(r *retry.R),
) {
t.Helper()
var (
node = service.Node
internalPort = service.PortOrDefault(upstream.PortName)
addr = fmt.Sprintf("%s:%d", node.LocalAddress(), internalPort)
client = a.mustGetHTTPClient(t, node.Cluster)
)
if useHTTP2 {
// We can't use the forward proxy for http2, so use the exposed port on localhost instead.
exposedPort := node.ExposedPort(internalPort)
require.True(t, exposedPort > 0)
addr = fmt.Sprintf("%s:%d", "127.0.0.1", exposedPort)
// This will clear the proxy field on the transport.
client = EnableHTTP2(client)
}
var actualURL string
if upstream.Implied {
actualURL = fmt.Sprintf("http://%s--%s--%s.virtual.consul:%d/%s",
upstream.ID.Name,
upstream.ID.Namespace,
upstream.ID.Partition,
upstream.VirtualPort,
path,
)
} else {
actualURL = fmt.Sprintf("http://localhost:%d/%s", upstream.LocalPort, path)
}
multiassert(t, count, resetFn, func(r *retry.R) {
name, err := GetBlankspaceNameViaHTTP(context.Background(), client, addr, actualURL)
require.NoError(r, err)
attemptFn(r, name)
}, func(r *retry.R) {
checkFn(r)
})
}
// CheckBlankspaceNameViaTCP calls a copy of blankspace and asserts it arrived
// on the correct instance using plain tcp sockets.
func (a *Asserter) CheckBlankspaceNameViaTCP(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
clusterName string,
sid topology.ServiceID,
) {
t.Helper()
a.checkBlankspaceNameViaTCPWithCallback(t, service, upstream, 1, func(_ *retry.R) {}, func(r *retry.R, remoteName string) {
require.Equal(r, fmt.Sprintf("%s::%s", clusterName, sid.String()), remoteName)
}, func(r *retry.R) {})
}
// CheckBlankspaceNameTrafficSplitViaTCP is like CheckBlankspaceNameViaTCP
// but it is verifying a relative traffic split.
func (a *Asserter) CheckBlankspaceNameTrafficSplitViaTCP(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
expect map[string]int,
) {
t.Helper()
got := make(map[string]int)
a.checkBlankspaceNameViaTCPWithCallback(t, service, upstream, 100, func(_ *retry.R) {
got = make(map[string]int)
}, func(_ *retry.R, name string) {
got[name]++
}, func(r *retry.R) {
assertTrafficSplitFor100Requests(r, got, expect)
})
}
func (a *Asserter) checkBlankspaceNameViaTCPWithCallback(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
count int,
resetFn func(r *retry.R),
attemptFn func(r *retry.R, remoteName string),
checkFn func(r *retry.R),
) {
t.Helper()
require.False(t, upstream.Implied, "helper does not support tproxy yet")
port := upstream.LocalPort
require.True(t, port > 0)
node := service.Node
// We can't use the forward proxy for TCP yet, so use the exposed port on localhost instead.
exposedPort := node.ExposedPort(port)
require.True(t, exposedPort > 0)
addr := fmt.Sprintf("%s:%d", "127.0.0.1", exposedPort)
multiassert(t, count, resetFn, func(r *retry.R) {
name, err := GetBlankspaceNameViaTCP(context.Background(), addr)
require.NoError(r, err)
attemptFn(r, name)
}, func(r *retry.R) {
checkFn(r)
})
}
// CheckBlankspaceNameViaGRPC calls a copy of blankspace and asserts it arrived
// on the correct instance using gRPC.
func (a *Asserter) CheckBlankspaceNameViaGRPC(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
clusterName string,
sid topology.ServiceID,
) {
t.Helper()
a.checkBlankspaceNameViaGRPCWithCallback(t, service, upstream, 1, func(_ *retry.R) {}, func(r *retry.R, remoteName string) {
require.Equal(r, fmt.Sprintf("%s::%s", clusterName, sid.String()), remoteName)
}, func(_ *retry.R) {})
}
// CheckBlankspaceNameTrafficSplitViaGRPC is like CheckBlankspaceNameViaGRPC
// but it is verifying a relative traffic split.
func (a *Asserter) CheckBlankspaceNameTrafficSplitViaGRPC(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
expect map[string]int,
) {
t.Helper()
got := make(map[string]int)
a.checkBlankspaceNameViaGRPCWithCallback(t, service, upstream, 100, func(_ *retry.R) {
got = make(map[string]int)
}, func(_ *retry.R, name string) {
got[name]++
}, func(r *retry.R) {
assertTrafficSplitFor100Requests(r, got, expect)
})
}
func (a *Asserter) checkBlankspaceNameViaGRPCWithCallback(
t *testing.T,
service *topology.Service,
upstream *topology.Upstream,
count int,
resetFn func(r *retry.R),
attemptFn func(r *retry.R, remoteName string),
checkFn func(r *retry.R),
) {
t.Helper()
require.False(t, upstream.Implied, "helper does not support tproxy yet")
port := upstream.LocalPort
require.True(t, port > 0)
node := service.Node
// We can't use the forward proxy for gRPC yet, so use the exposed port on localhost instead.
exposedPort := node.ExposedPort(port)
require.True(t, exposedPort > 0)
addr := fmt.Sprintf("%s:%d", "127.0.0.1", exposedPort)
multiassert(t, count, resetFn, func(r *retry.R) {
name, err := GetBlankspaceNameViaGRPC(context.Background(), addr)
require.NoError(r, err)
attemptFn(r, name)
}, func(r *retry.R) {
checkFn(r)
})
}
// assertTrafficSplitFor100Requests compares the counts of 100 requests that
// did reach an observed set of destinations (nameCounts) against the expected
// counts of those same services is the same within a fixed difference of 2.
func assertTrafficSplitFor100Requests(t require.TestingT, nameCounts map[string]int, expect map[string]int) {
const (
numRequests = 100
allowedDelta = 2
)
require.Equal(t, numRequests, sumMapValues(nameCounts), "measured traffic was not %d requests", numRequests)
require.Equal(t, numRequests, sumMapValues(expect), "expected traffic was not %d requests", numRequests)
assertTrafficSplit(t, nameCounts, expect, allowedDelta)
}
func sumMapValues(m map[string]int) int {
sum := 0
for _, v := range m {
sum += v
}
return sum
}
// assertTrafficSplit compares the counts of requests that did reach an
// observed set of destinations (nameCounts) against the expected counts of
// those same services is the same within the provided allowedDelta value.
//
// When doing random traffic splits it'll never be perfect so we need the
// wiggle room to avoid having a flaky test.
func assertTrafficSplit(t require.TestingT, nameCounts map[string]int, expect map[string]int, allowedDelta int) {
require.Len(t, nameCounts, len(expect))
for name, expectCount := range expect {
gotCount, ok := nameCounts[name]
require.True(t, ok)
if len(expect) == 1 {
require.Equal(t, expectCount, gotCount)
} else {
require.InDelta(t, expectCount, gotCount, float64(allowedDelta),
"expected %q side of split to have %d requests not %d (e=%d)",
name, expectCount, gotCount, allowedDelta,
)
}
}
}
// multiassert will retry in bulk calling attemptFn count times and following
// that with one last call to checkFn.
//
// It's primary use at the time it was written was to execute a set of requests
// repeatedly to witness where the requests went, and then at the end doing a
// verification of traffic splits (a bit like MAP/REDUCE).
func multiassert(t *testing.T, count int, resetFn, attemptFn, checkFn func(r *retry.R)) {
retry.RunWith(&retry.Timer{Timeout: 30 * time.Second, Wait: 500 * time.Millisecond}, t, func(r *retry.R) {
resetFn(r)
for i := 0; i < count; i++ {
attemptFn(r)
}
checkFn(r)
})
}