mirror of https://github.com/status-im/consul.git
434 lines
12 KiB
Go
434 lines
12 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package resourcetest
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"math/rand"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/require"
|
|
"golang.org/x/exp/slices"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"github.com/hashicorp/consul/internal/resource"
|
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
|
"github.com/hashicorp/consul/sdk/testutil"
|
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
|
)
|
|
|
|
type ClientOption func(*Client)
|
|
|
|
func WithRNGSeed(seed int64) ClientOption {
|
|
return func(c *Client) {
|
|
c.rng = rand.New(rand.NewSource(seed))
|
|
}
|
|
}
|
|
|
|
func WithRequestDelay(minMilliseconds int, maxMilliseconds int) ClientOption {
|
|
return func(c *Client) {
|
|
|
|
min := minMilliseconds
|
|
max := maxMilliseconds
|
|
if max < min {
|
|
min = maxMilliseconds
|
|
max = minMilliseconds
|
|
}
|
|
c.requestDelayMin = min
|
|
c.requestDelayMax = max
|
|
}
|
|
}
|
|
|
|
func WithACLToken(token string) ClientOption {
|
|
return func(c *Client) {
|
|
c.token = token
|
|
}
|
|
}
|
|
|
|
type Client struct {
|
|
pbresource.ResourceServiceClient
|
|
|
|
timeout time.Duration
|
|
wait time.Duration
|
|
token string
|
|
|
|
rng *rand.Rand
|
|
|
|
requestDelayMin int
|
|
requestDelayMax int
|
|
}
|
|
|
|
func NewClient(client pbresource.ResourceServiceClient, opts ...ClientOption) *Client {
|
|
c := &Client{
|
|
ResourceServiceClient: client,
|
|
timeout: 7 * time.Second,
|
|
wait: 25 * time.Millisecond,
|
|
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
|
|
// arbitrary write delays are opt-in only
|
|
requestDelayMin: 0,
|
|
requestDelayMax: 0,
|
|
}
|
|
|
|
for _, opt := range opts {
|
|
opt(c)
|
|
}
|
|
|
|
return c
|
|
}
|
|
|
|
func NewClientWithACLToken(client pbresource.ResourceServiceClient, token string) *Client {
|
|
return NewClient(client, WithACLToken(token))
|
|
}
|
|
|
|
func (client *Client) SetRetryerConfig(timeout time.Duration, wait time.Duration) {
|
|
client.timeout = timeout
|
|
client.wait = wait
|
|
}
|
|
|
|
func (client *Client) retry(t T, fn func(r *retry.R)) {
|
|
t.Helper()
|
|
retryer := &retry.Timer{Timeout: client.timeout, Wait: client.wait}
|
|
retry.RunWith(retryer, t, fn)
|
|
}
|
|
|
|
func (client *Client) PublishResources(t T, resources []*pbresource.Resource) {
|
|
ctx := client.Context(t)
|
|
|
|
// Randomize the order of insertion. Generally insertion order shouldn't matter as the
|
|
// controllers should eventually converge on the desired state. The exception to this
|
|
// is that you cannot insert resources with owner refs before the resource they are
|
|
// owned by or insert a resource into a non-default tenant before that tenant exists.
|
|
client.rng.Shuffle(len(resources), func(i, j int) {
|
|
temp := resources[i]
|
|
resources[i] = resources[j]
|
|
resources[j] = temp
|
|
})
|
|
|
|
// This slice will be used to track the resources actually published each round. When
|
|
// a resource with an owner ID is encountered we will not attempt to write but defer it
|
|
// to the next round of publishing
|
|
var written []*pbresource.ID
|
|
|
|
for len(resources) > 0 {
|
|
var left []*pbresource.Resource
|
|
published := 0
|
|
for _, res := range resources {
|
|
|
|
// check that any owner references would be satisfied
|
|
if res.Owner != nil {
|
|
found := slices.ContainsFunc(written, func(id *pbresource.ID) bool {
|
|
return resource.EqualID(res.Owner, id)
|
|
})
|
|
|
|
// the owner hasn't yet been published then we cannot publish this resource
|
|
if !found {
|
|
left = append(left, res)
|
|
continue
|
|
}
|
|
}
|
|
|
|
t.Logf("Writing resource %s with type %s", res.Id.Name, resource.ToGVK(res.Id.Type))
|
|
rsp, err := client.Write(ctx, &pbresource.WriteRequest{
|
|
Resource: res,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
id := rsp.Resource.Id
|
|
t.Cleanup(func() {
|
|
client.CleanupDelete(t, id)
|
|
})
|
|
|
|
// track the number of resources published
|
|
published += 1
|
|
written = append(written, res.Id)
|
|
}
|
|
|
|
// the next round only has this subset of resources to attempt writing
|
|
resources = left
|
|
|
|
// if we didn't publish any resources this round then nothing would
|
|
// enable us to do so by iterating again so we break to prevent infinite
|
|
// loooping.
|
|
if published == 0 {
|
|
break
|
|
}
|
|
}
|
|
|
|
require.Empty(t, resources, "Could not publish all resources - some resources have invalid owner references")
|
|
}
|
|
|
|
func (client *Client) Write(ctx context.Context, in *pbresource.WriteRequest, opts ...grpc.CallOption) (*pbresource.WriteResponse, error) {
|
|
client.delayRequest()
|
|
return client.ResourceServiceClient.Write(ctx, in, opts...)
|
|
}
|
|
|
|
func (client *Client) Context(t T) context.Context {
|
|
ctx := testutil.TestContext(t)
|
|
|
|
if client.token != "" {
|
|
md := metadata.New(map[string]string{
|
|
"x-consul-token": client.token,
|
|
})
|
|
ctx = metadata.NewOutgoingContext(ctx, md)
|
|
}
|
|
|
|
return ctx
|
|
}
|
|
|
|
func (client *Client) RequireResourceNotFound(t T, id *pbresource.ID) {
|
|
t.Helper()
|
|
|
|
rsp, err := client.Read(client.Context(t), &pbresource.ReadRequest{Id: id})
|
|
require.Error(t, err)
|
|
require.Equal(t, codes.NotFound, status.Code(err))
|
|
require.Nil(t, rsp)
|
|
}
|
|
|
|
func (client *Client) RequireResourceExists(t T, id *pbresource.ID) *pbresource.Resource {
|
|
t.Helper()
|
|
|
|
rsp, err := client.Read(client.Context(t), &pbresource.ReadRequest{Id: id})
|
|
require.NoError(t, err, "error reading %s with type %s", id.Name, resource.ToGVK(id.Type))
|
|
require.NotNil(t, rsp)
|
|
return rsp.Resource
|
|
}
|
|
|
|
func (client *Client) RequireVersionUnchanged(t T, id *pbresource.ID, version string) *pbresource.Resource {
|
|
t.Helper()
|
|
|
|
res := client.RequireResourceExists(t, id)
|
|
RequireVersionUnchanged(t, res, version)
|
|
return res
|
|
}
|
|
|
|
func (client *Client) RequireVersionChanged(t T, id *pbresource.ID, version string) *pbresource.Resource {
|
|
t.Helper()
|
|
|
|
res := client.RequireResourceExists(t, id)
|
|
RequireVersionChanged(t, res, version)
|
|
return res
|
|
}
|
|
|
|
func (client *Client) RequireStatusCondition(t T, id *pbresource.ID, statusKey string, condition *pbresource.Condition) *pbresource.Resource {
|
|
t.Helper()
|
|
|
|
res := client.RequireResourceExists(t, id)
|
|
RequireStatusCondition(t, res, statusKey, condition)
|
|
return res
|
|
}
|
|
|
|
func (client *Client) RequireStatusConditionForCurrentGen(t T, id *pbresource.ID, statusKey string, condition *pbresource.Condition) *pbresource.Resource {
|
|
t.Helper()
|
|
|
|
res := client.RequireResourceExists(t, id)
|
|
RequireStatusConditionForCurrentGen(t, res, statusKey, condition)
|
|
return res
|
|
}
|
|
|
|
func (client *Client) RequireStatusConditionsForCurrentGen(t T, id *pbresource.ID, statusKey string, conditions []*pbresource.Condition) *pbresource.Resource {
|
|
t.Helper()
|
|
|
|
res := client.RequireResourceExists(t, id)
|
|
for _, condition := range conditions {
|
|
RequireStatusConditionForCurrentGen(t, res, statusKey, condition)
|
|
}
|
|
return res
|
|
}
|
|
|
|
func (client *Client) RequireResourceMeta(t T, id *pbresource.ID, key string, value string) *pbresource.Resource {
|
|
t.Helper()
|
|
|
|
res := client.RequireResourceExists(t, id)
|
|
RequireResourceMeta(t, res, key, value)
|
|
return res
|
|
}
|
|
|
|
func (client *Client) RequireReconciledCurrentGen(t T, id *pbresource.ID, statusKey string) *pbresource.Resource {
|
|
t.Helper()
|
|
|
|
res := client.RequireResourceExists(t, id)
|
|
RequireReconciledCurrentGen(t, res, statusKey)
|
|
return res
|
|
}
|
|
|
|
func (client *Client) WaitForReconciliation(t T, id *pbresource.ID, statusKey string) *pbresource.Resource {
|
|
t.Helper()
|
|
|
|
var res *pbresource.Resource
|
|
|
|
client.retry(t, func(r *retry.R) {
|
|
res = client.RequireReconciledCurrentGen(r, id, statusKey)
|
|
})
|
|
|
|
return res
|
|
}
|
|
|
|
func (client *Client) WaitForStatusCondition(t T, id *pbresource.ID, statusKey string, condition *pbresource.Condition) *pbresource.Resource {
|
|
t.Helper()
|
|
|
|
var res *pbresource.Resource
|
|
client.retry(t, func(r *retry.R) {
|
|
res = client.RequireStatusConditionForCurrentGen(r, id, statusKey, condition)
|
|
})
|
|
|
|
return res
|
|
}
|
|
|
|
func (client *Client) WaitForStatusConditionAnyGen(t T, id *pbresource.ID, statusKey string, condition *pbresource.Condition) *pbresource.Resource {
|
|
t.Helper()
|
|
|
|
var res *pbresource.Resource
|
|
client.retry(t, func(r *retry.R) {
|
|
res = client.RequireStatusCondition(r, id, statusKey, condition)
|
|
})
|
|
|
|
return res
|
|
}
|
|
|
|
func (client *Client) WaitForStatusConditions(t T, id *pbresource.ID, statusKey string, conditions ...*pbresource.Condition) *pbresource.Resource {
|
|
t.Helper()
|
|
|
|
var res *pbresource.Resource
|
|
client.retry(t, func(r *retry.R) {
|
|
res = client.RequireStatusConditionsForCurrentGen(r, id, statusKey, conditions)
|
|
})
|
|
|
|
return res
|
|
}
|
|
|
|
func (client *Client) WaitForNewVersion(t T, id *pbresource.ID, version string) *pbresource.Resource {
|
|
t.Helper()
|
|
|
|
var res *pbresource.Resource
|
|
client.retry(t, func(r *retry.R) {
|
|
res = client.RequireVersionChanged(r, id, version)
|
|
})
|
|
return res
|
|
}
|
|
|
|
func (client *Client) WaitForResourceState(t T, id *pbresource.ID, verify func(T, *pbresource.Resource)) *pbresource.Resource {
|
|
t.Helper()
|
|
|
|
var res *pbresource.Resource
|
|
client.retry(t, func(r *retry.R) {
|
|
res = client.RequireResourceExists(r, id)
|
|
verify(r, res)
|
|
})
|
|
|
|
return res
|
|
}
|
|
|
|
func (client *Client) WaitForDeletion(t T, id *pbresource.ID) {
|
|
t.Helper()
|
|
|
|
client.retry(t, func(r *retry.R) {
|
|
client.RequireResourceNotFound(r, id)
|
|
})
|
|
}
|
|
|
|
// ResolveResourceID will read the specified resource and returns its full ID.
|
|
// This is mainly useful to get the ID with the Uid filled out.
|
|
func (client *Client) ResolveResourceID(t T, id *pbresource.ID) *pbresource.ID {
|
|
t.Helper()
|
|
|
|
return client.RequireResourceExists(t, id).Id
|
|
}
|
|
|
|
// MustDelete will delete a resource by its id, retrying if necessary and fail the test
|
|
// if it cannot delete it within the timeout. The clients request delay settings are
|
|
// taken into account with this operation.
|
|
func (client *Client) MustDelete(t T, id *pbresource.ID) {
|
|
t.Helper()
|
|
client.retryDelete(t, id, true)
|
|
}
|
|
|
|
// CleanupDelete will perform the same operations as MustDelete to ensure the resource is
|
|
// deleted. The clients request delay settings are ignored for this operation and it is
|
|
// assumed this will only be called in the context of test Cleanup routines where we
|
|
// are no longer testing that a controller eventually converges on some values in response
|
|
// to the delete.
|
|
func (client *Client) CleanupDelete(t T, id *pbresource.ID) {
|
|
t.Helper()
|
|
client.retryDelete(t, id, false)
|
|
}
|
|
|
|
func (client *Client) retryDelete(t T, id *pbresource.ID, shouldDelay bool) {
|
|
t.Helper()
|
|
ctx := client.Context(t)
|
|
|
|
client.retry(t, func(r *retry.R) {
|
|
if shouldDelay {
|
|
client.delayRequest()
|
|
}
|
|
_, err := client.Delete(ctx, &pbresource.DeleteRequest{Id: id})
|
|
if status.Code(err) == codes.NotFound {
|
|
return
|
|
}
|
|
|
|
// codes.Aborted indicates a CAS failure and that the delete request should
|
|
// be retried. Anything else should be considered an unrecoverable error.
|
|
if err != nil && status.Code(err) != codes.Aborted {
|
|
r.Stop(fmt.Errorf("failed to delete the resource: %w", err))
|
|
return
|
|
}
|
|
|
|
require.NoError(r, err)
|
|
})
|
|
}
|
|
|
|
func (client *Client) delayRequest() {
|
|
if client.requestDelayMin == 0 && client.requestDelayMax == 0 {
|
|
return
|
|
}
|
|
|
|
var delay time.Duration
|
|
if client.requestDelayMin == client.requestDelayMax {
|
|
delay = time.Duration(client.requestDelayMin) * time.Millisecond
|
|
} else {
|
|
delay = time.Duration(client.rng.Intn(client.requestDelayMax-client.requestDelayMin)+client.requestDelayMin) * time.Millisecond
|
|
}
|
|
time.Sleep(delay)
|
|
}
|
|
|
|
type CLIOptions struct {
|
|
minRequestDelay int
|
|
maxRequestDelay int
|
|
seed int64
|
|
}
|
|
|
|
type CLIOptionT interface {
|
|
Helper()
|
|
Logf(string, ...any)
|
|
}
|
|
|
|
func (o *CLIOptions) ClientOptions(t CLIOptionT) []ClientOption {
|
|
t.Helper()
|
|
t.Logf("Using %d for the random number generator seed. Pass -rng-seed=<value> to overwrite the time based seed", o.seed)
|
|
t.Logf("Using random request delays between %dms and %dms. Use -min-request-delay=<value> or -max-request-delay=<value> to override the defaults", o.minRequestDelay, o.maxRequestDelay)
|
|
|
|
return []ClientOption{
|
|
WithRNGSeed(o.seed),
|
|
WithRequestDelay(o.minRequestDelay, o.maxRequestDelay),
|
|
}
|
|
}
|
|
|
|
func ConfigureTestCLIFlags() *CLIOptions {
|
|
opts := &CLIOptions{
|
|
minRequestDelay: 0,
|
|
maxRequestDelay: 0,
|
|
seed: time.Now().UnixNano(),
|
|
}
|
|
|
|
flag.Int64Var(&opts.seed, "rng-seed", opts.seed, "Seed to use for pseudo-random-number-generators")
|
|
flag.IntVar(&opts.minRequestDelay, "min-request-delay", 10, "Minimum delay before performing a resource write (milliseconds: default=10)")
|
|
flag.IntVar(&opts.maxRequestDelay, "max-request-delay", 50, "Maximum delay before performing a resource write (milliseconds: default=50)")
|
|
|
|
return opts
|
|
}
|