mirror of https://github.com/status-im/consul.git
Introduce randomized timings and reproducible randomization into controller integration tests. (#19393)
As the V2 architecture hinges on eventual consistency and controllers reconciling the existing state in response to writes, there are potential issues we could run into regarding ordering and timing of operations. We want to be able to guarantee that given a set of resources the system will always eventually get to the desired correct state. The order of resource writes and delays in performing those writes should not alter the final outcome of reaching the desired state. To that end, this commit introduces arbitrary randomized delays before performing resources writes into the `resourcetest.Client`. Its `PublishResources` method was already randomizing the order of resource writes. By default, no delay is added to normal writes and deletes but tests can opt-in via either passing hard coded options when creating the `resourcetest.Client` or using the `resourcetest.ConfigureTestCLIFlags` function to allow processing of CLI parameters. In addition to allowing configurability of the request delay min and max, the client also has a configurable random number generator seed. When Using the CLI parameter helpers, a test log will be written noting the currently used settings. If the test fails then you can reproduce the same delays and order randomizations by providing the seed during the previous test failure.
This commit is contained in:
parent
09f73d1abf
commit
a7774a9538
|
@ -11,10 +11,15 @@ import (
|
|||
"github.com/hashicorp/consul/internal/catalog/internal/controllers"
|
||||
"github.com/hashicorp/consul/internal/controller"
|
||||
"github.com/hashicorp/consul/internal/resource/reaper"
|
||||
rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
var (
|
||||
clientOpts = rtest.ConfigureTestCLIFlags()
|
||||
)
|
||||
|
||||
func runInMemResourceServiceAndControllers(t *testing.T, deps controllers.Dependencies) pbresource.ResourceServiceClient {
|
||||
t.Helper()
|
||||
|
||||
|
@ -38,10 +43,10 @@ func runInMemResourceServiceAndControllers(t *testing.T, deps controllers.Depend
|
|||
|
||||
func TestControllers_Integration(t *testing.T) {
|
||||
client := runInMemResourceServiceAndControllers(t, catalog.DefaultControllerDependencies())
|
||||
RunCatalogV2Beta1IntegrationTest(t, client)
|
||||
RunCatalogV2Beta1IntegrationTest(t, client, clientOpts.ClientOptions(t)...)
|
||||
}
|
||||
|
||||
func TestControllers_Lifecycle(t *testing.T) {
|
||||
client := runInMemResourceServiceAndControllers(t, catalog.DefaultControllerDependencies())
|
||||
RunCatalogV2Beta1LifecycleIntegrationTest(t, client)
|
||||
RunCatalogV2Beta1LifecycleIntegrationTest(t, client, clientOpts.ClientOptions(t)...)
|
||||
}
|
||||
|
|
|
@ -38,19 +38,19 @@ var (
|
|||
// is another RunCatalogIntegrationTestLifeCycle function that can be used for those
|
||||
// purposes. The two are distinct so that the data being published and the assertions
|
||||
// made against the system can be reused in upgrade tests.
|
||||
func RunCatalogV2Beta1IntegrationTest(t *testing.T, client pbresource.ResourceServiceClient) {
|
||||
func RunCatalogV2Beta1IntegrationTest(t *testing.T, client pbresource.ResourceServiceClient, opts ...rtest.ClientOption) {
|
||||
t.Helper()
|
||||
|
||||
PublishCatalogV2Beta1IntegrationTestData(t, client)
|
||||
PublishCatalogV2Beta1IntegrationTestData(t, client, opts...)
|
||||
VerifyCatalogV2Beta1IntegrationTestResults(t, client)
|
||||
}
|
||||
|
||||
// PublishCatalogV2Beta1IntegrationTestData will perform a whole bunch of resource writes
|
||||
// for Service, ServiceEndpoints, Workload, Node and HealthStatus objects
|
||||
func PublishCatalogV2Beta1IntegrationTestData(t *testing.T, client pbresource.ResourceServiceClient) {
|
||||
func PublishCatalogV2Beta1IntegrationTestData(t *testing.T, client pbresource.ResourceServiceClient, opts ...rtest.ClientOption) {
|
||||
t.Helper()
|
||||
|
||||
c := rtest.NewClient(client)
|
||||
c := rtest.NewClient(client, opts...)
|
||||
|
||||
resources := rtest.ParseResourcesFromFilesystem(t, testData, "integration_test_data/v2beta1")
|
||||
c.PublishResources(t, resources)
|
||||
|
|
|
@ -16,19 +16,19 @@ import (
|
|||
// RunCatalogV2Beta1LifecycleIntegrationTest intends to excercise functionality of
|
||||
// managing catalog resources over their normal lifecycle where they will be modified
|
||||
// several times, change state etc.
|
||||
func RunCatalogV2Beta1LifecycleIntegrationTest(t *testing.T, client pbresource.ResourceServiceClient) {
|
||||
func RunCatalogV2Beta1LifecycleIntegrationTest(t *testing.T, client pbresource.ResourceServiceClient, opts ...rtest.ClientOption) {
|
||||
t.Helper()
|
||||
|
||||
testutil.RunStep(t, "node-lifecycle", func(t *testing.T) {
|
||||
RunCatalogV2Beta1NodeLifecycleIntegrationTest(t, client)
|
||||
RunCatalogV2Beta1NodeLifecycleIntegrationTest(t, client, opts...)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "workload-lifecycle", func(t *testing.T) {
|
||||
RunCatalogV2Beta1WorkloadLifecycleIntegrationTest(t, client)
|
||||
RunCatalogV2Beta1WorkloadLifecycleIntegrationTest(t, client, opts...)
|
||||
})
|
||||
|
||||
testutil.RunStep(t, "endpoints-lifecycle", func(t *testing.T) {
|
||||
RunCatalogV2Beta1EndpointsLifecycleIntegrationTest(t, client)
|
||||
RunCatalogV2Beta1EndpointsLifecycleIntegrationTest(t, client, opts...)
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -41,8 +41,8 @@ func RunCatalogV2Beta1LifecycleIntegrationTest(t *testing.T, client pbresource.R
|
|||
// * Changing HealthStatus to a better health will cause recomputation of the Health
|
||||
// * Deletion of associated HealthStatuses will recompute the Health (back to passing)
|
||||
// * Deletion of the node will cause deletion of associated health statuses
|
||||
func RunCatalogV2Beta1NodeLifecycleIntegrationTest(t *testing.T, client pbresource.ResourceServiceClient) {
|
||||
c := rtest.NewClient(client)
|
||||
func RunCatalogV2Beta1NodeLifecycleIntegrationTest(t *testing.T, client pbresource.ResourceServiceClient, opts ...rtest.ClientOption) {
|
||||
c := rtest.NewClient(client, opts...)
|
||||
|
||||
nodeName := "test-lifecycle"
|
||||
nodeHealthName := "test-lifecycle-node-status"
|
||||
|
@ -131,8 +131,8 @@ func RunCatalogV2Beta1NodeLifecycleIntegrationTest(t *testing.T, client pbresour
|
|||
// - Overall health is computed as the worst health amongst the nodes health and all
|
||||
// of the workloads associated HealthStatuses
|
||||
// - Deletion of the workload will cause deletion of all associated health statuses.
|
||||
func RunCatalogV2Beta1WorkloadLifecycleIntegrationTest(t *testing.T, client pbresource.ResourceServiceClient) {
|
||||
c := rtest.NewClient(client)
|
||||
func RunCatalogV2Beta1WorkloadLifecycleIntegrationTest(t *testing.T, client pbresource.ResourceServiceClient, opts ...rtest.ClientOption) {
|
||||
c := rtest.NewClient(client, opts...)
|
||||
testutil.RunStep(t, "nodeless-workload", func(t *testing.T) {
|
||||
runV2Beta1NodelessWorkloadLifecycleIntegrationTest(t, c)
|
||||
})
|
||||
|
@ -379,8 +379,8 @@ func runV2Beta1NodeAssociatedWorkloadLifecycleIntegrationTest(t *testing.T, c *r
|
|||
// * Adding ports to a service will recalculate the endpoints
|
||||
// * Removing ports from a service will recalculate the endpoints
|
||||
// * Changing the workload will recalculate the endpoints (ports, addresses, or health)
|
||||
func RunCatalogV2Beta1EndpointsLifecycleIntegrationTest(t *testing.T, client pbresource.ResourceServiceClient) {
|
||||
c := rtest.NewClient(client)
|
||||
func RunCatalogV2Beta1EndpointsLifecycleIntegrationTest(t *testing.T, client pbresource.ResourceServiceClient, opts ...rtest.ClientOption) {
|
||||
c := rtest.NewClient(client, opts...)
|
||||
serviceName := "test-lifecycle"
|
||||
|
||||
// Create the service without a selector. We should not see endpoints generated but we should see the
|
||||
|
|
|
@ -178,7 +178,7 @@ func (b *resourceBuilder) Write(t T, client pbresource.ResourceServiceClient) *p
|
|||
id := proto.Clone(rsp.Resource.Id).(*pbresource.ID)
|
||||
id.Uid = ""
|
||||
t.Cleanup(func() {
|
||||
rtestClient.MustDelete(t, id)
|
||||
rtestClient.CleanupDelete(t, id)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
|
@ -5,12 +5,14 @@ package resourcetest
|
|||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/exp/slices"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
|
@ -21,25 +23,67 @@ import (
|
|||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
)
|
||||
|
||||
type ClientOption func(*Client)
|
||||
|
||||
func WithRNGSeed(seed int64) ClientOption {
|
||||
return func(c *Client) {
|
||||
c.rng = rand.New(rand.NewSource(seed))
|
||||
}
|
||||
}
|
||||
|
||||
func WithRequestDelay(minMilliseconds int, maxMilliseconds int) ClientOption {
|
||||
return func(c *Client) {
|
||||
|
||||
min := minMilliseconds
|
||||
max := maxMilliseconds
|
||||
if max < min {
|
||||
min = maxMilliseconds
|
||||
max = minMilliseconds
|
||||
}
|
||||
c.requestDelayMin = min
|
||||
c.requestDelayMax = max
|
||||
}
|
||||
}
|
||||
|
||||
func WithACLToken(token string) ClientOption {
|
||||
return func(c *Client) {
|
||||
c.token = token
|
||||
}
|
||||
}
|
||||
|
||||
type Client struct {
|
||||
pbresource.ResourceServiceClient
|
||||
|
||||
timeout time.Duration
|
||||
wait time.Duration
|
||||
token string
|
||||
|
||||
rng *rand.Rand
|
||||
|
||||
requestDelayMin int
|
||||
requestDelayMax int
|
||||
}
|
||||
|
||||
func NewClient(client pbresource.ResourceServiceClient) *Client {
|
||||
return NewClientWithACLToken(client, "")
|
||||
}
|
||||
|
||||
func NewClientWithACLToken(client pbresource.ResourceServiceClient, token string) *Client {
|
||||
return &Client{
|
||||
func NewClient(client pbresource.ResourceServiceClient, opts ...ClientOption) *Client {
|
||||
c := &Client{
|
||||
ResourceServiceClient: client,
|
||||
timeout: 7 * time.Second,
|
||||
wait: 25 * time.Millisecond,
|
||||
token: token,
|
||||
rng: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
// arbitrary write delays are opt-in only
|
||||
requestDelayMin: 0,
|
||||
requestDelayMax: 0,
|
||||
}
|
||||
|
||||
for _, opt := range opts {
|
||||
opt(c)
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func NewClientWithACLToken(client pbresource.ResourceServiceClient, token string) *Client {
|
||||
return NewClient(client, WithACLToken(token))
|
||||
}
|
||||
|
||||
func (client *Client) SetRetryerConfig(timeout time.Duration, wait time.Duration) {
|
||||
|
@ -60,7 +104,7 @@ func (client *Client) PublishResources(t T, resources []*pbresource.Resource) {
|
|||
// controllers should eventually converge on the desired state. The exception to this
|
||||
// is that you cannot insert resources with owner refs before the resource they are
|
||||
// owned by or insert a resource into a non-default tenant before that tenant exists.
|
||||
rand.Shuffle(len(resources), func(i, j int) {
|
||||
client.rng.Shuffle(len(resources), func(i, j int) {
|
||||
temp := resources[i]
|
||||
resources[i] = resources[j]
|
||||
resources[j] = temp
|
||||
|
@ -97,7 +141,7 @@ func (client *Client) PublishResources(t T, resources []*pbresource.Resource) {
|
|||
|
||||
id := rsp.Resource.Id
|
||||
t.Cleanup(func() {
|
||||
client.MustDelete(t, id)
|
||||
client.CleanupDelete(t, id)
|
||||
})
|
||||
|
||||
// track the number of resources published
|
||||
|
@ -119,6 +163,11 @@ func (client *Client) PublishResources(t T, resources []*pbresource.Resource) {
|
|||
require.Empty(t, resources, "Could not publish all resources - some resources have invalid owner references")
|
||||
}
|
||||
|
||||
func (client *Client) Write(ctx context.Context, in *pbresource.WriteRequest, opts ...grpc.CallOption) (*pbresource.WriteResponse, error) {
|
||||
client.delayRequest()
|
||||
return client.ResourceServiceClient.Write(ctx, in, opts...)
|
||||
}
|
||||
|
||||
func (client *Client) Context(t T) context.Context {
|
||||
ctx := testutil.TestContext(t)
|
||||
|
||||
|
@ -291,11 +340,32 @@ func (client *Client) ResolveResourceID(t T, id *pbresource.ID) *pbresource.ID {
|
|||
return client.RequireResourceExists(t, id).Id
|
||||
}
|
||||
|
||||
// MustDelete will delete a resource by its id, retrying if necessary and fail the test
|
||||
// if it cannot delete it within the timeout. The clients request delay settings are
|
||||
// taken into account with this operation.
|
||||
func (client *Client) MustDelete(t T, id *pbresource.ID) {
|
||||
t.Helper()
|
||||
client.retryDelete(t, id, true)
|
||||
}
|
||||
|
||||
// CleanupDelete will perform the same operations as MustDelete to ensure the resource is
|
||||
// deleted. The clients request delay settings are ignored for this operation and it is
|
||||
// assumed this will only be called in the context of test Cleanup routines where we
|
||||
// are no longer testing that a controller eventually converges on some values in response
|
||||
// to the delete.
|
||||
func (client *Client) CleanupDelete(t T, id *pbresource.ID) {
|
||||
t.Helper()
|
||||
client.retryDelete(t, id, false)
|
||||
}
|
||||
|
||||
func (client *Client) retryDelete(t T, id *pbresource.ID, shouldDelay bool) {
|
||||
t.Helper()
|
||||
ctx := client.Context(t)
|
||||
|
||||
client.retry(t, func(r *retry.R) {
|
||||
if shouldDelay {
|
||||
client.delayRequest()
|
||||
}
|
||||
_, err := client.Delete(ctx, &pbresource.DeleteRequest{Id: id})
|
||||
if status.Code(err) == codes.NotFound {
|
||||
return
|
||||
|
@ -311,3 +381,53 @@ func (client *Client) MustDelete(t T, id *pbresource.ID) {
|
|||
require.NoError(r, err)
|
||||
})
|
||||
}
|
||||
|
||||
func (client *Client) delayRequest() {
|
||||
if client.requestDelayMin == 0 && client.requestDelayMax == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
var delay time.Duration
|
||||
if client.requestDelayMin == client.requestDelayMax {
|
||||
delay = time.Duration(client.requestDelayMin) * time.Millisecond
|
||||
} else {
|
||||
delay = time.Duration(client.rng.Intn(client.requestDelayMax-client.requestDelayMin)+client.requestDelayMin) * time.Millisecond
|
||||
}
|
||||
time.Sleep(delay)
|
||||
}
|
||||
|
||||
type CLIOptions struct {
|
||||
minRequestDelay int
|
||||
maxRequestDelay int
|
||||
seed int64
|
||||
}
|
||||
|
||||
type CLIOptionT interface {
|
||||
Helper()
|
||||
Logf(string, ...any)
|
||||
}
|
||||
|
||||
func (o *CLIOptions) ClientOptions(t CLIOptionT) []ClientOption {
|
||||
t.Helper()
|
||||
t.Logf("Using %d for the random number generator seed. Pass -rng-seed=<value> to overwrite the time based seed", o.seed)
|
||||
t.Logf("Using random request delays between %dms and %dms. Use -min-request-delay=<value> or -max-request-delay=<value> to override the defaults", o.minRequestDelay, o.maxRequestDelay)
|
||||
|
||||
return []ClientOption{
|
||||
WithRNGSeed(o.seed),
|
||||
WithRequestDelay(o.minRequestDelay, o.maxRequestDelay),
|
||||
}
|
||||
}
|
||||
|
||||
func ConfigureTestCLIFlags() *CLIOptions {
|
||||
opts := &CLIOptions{
|
||||
minRequestDelay: 0,
|
||||
maxRequestDelay: 0,
|
||||
seed: time.Now().UnixNano(),
|
||||
}
|
||||
|
||||
flag.Int64Var(&opts.seed, "rng-seed", opts.seed, "Seed to use for pseudo-random-number-generators")
|
||||
flag.IntVar(&opts.minRequestDelay, "min-request-delay", 10, "Minimum delay before performing a resource write (milliseconds: default=10)")
|
||||
flag.IntVar(&opts.maxRequestDelay, "max-request-delay", 50, "Maximum delay before performing a resource write (milliseconds: default=50)")
|
||||
|
||||
return opts
|
||||
}
|
||||
|
|
|
@ -12,9 +12,14 @@ import (
|
|||
libtopology "github.com/hashicorp/consul/test/integration/consul-container/libs/topology"
|
||||
|
||||
"github.com/hashicorp/consul/internal/catalog/catalogtest"
|
||||
rtest "github.com/hashicorp/consul/internal/resource/resourcetest"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
var (
|
||||
cli = rtest.ConfigureTestCLIFlags()
|
||||
)
|
||||
|
||||
func TestCatalog(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
|
@ -29,10 +34,10 @@ func TestCatalog(t *testing.T) {
|
|||
client := pbresource.NewResourceServiceClient(followers[0].GetGRPCConn())
|
||||
|
||||
t.Run("one-shot", func(t *testing.T) {
|
||||
catalogtest.RunCatalogV2Beta1IntegrationTest(t, client)
|
||||
catalogtest.RunCatalogV2Beta1IntegrationTest(t, client, cli.ClientOptions(t)...)
|
||||
})
|
||||
|
||||
t.Run("lifecycle", func(t *testing.T) {
|
||||
catalogtest.RunCatalogV2Beta1LifecycleIntegrationTest(t, client)
|
||||
catalogtest.RunCatalogV2Beta1LifecycleIntegrationTest(t, client, cli.ClientOptions(t)...)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue