From 8307e40f2bb54ea7060bc0765834f4b5f43992a4 Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Fri, 4 Mar 2022 11:20:01 -0600 Subject: [PATCH] reduce flakiness/raciness of errNotFound and errNotChanged blocking query tests (#12518) Improves tests from #12362 These tests try to setup the following concurrent scenario: 1. (goroutine 1) execute read RPC with index=0 2. (goroutine 1) get response from (1) @ index=10 3. (goroutine 1) execute read RPC with index=10 and block 4. (goroutine 2) WHILE (3) is blocking, start slamming the system with stray writes that will cause the WatchSet to wakeup 5. (goroutine 2) after doing all writes, shut down the reader above 6. (goroutine 1) stops reading, double checks that it only ever woke up once (from 1) --- agent/consul/config_endpoint_test.go | 226 ++++++------------ agent/consul/discovery_chain_endpoint_test.go | 82 ++----- agent/consul/intention_endpoint_test.go | 91 +++---- agent/consul/internal_endpoint_test.go | 79 ++---- agent/consul/rpc_test.go | 110 +++++++-- agent/consul/status_endpoint_test.go | 14 +- 6 files changed, 266 insertions(+), 336 deletions(-) diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index 414886b87e..b5778b8234 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -1,7 +1,6 @@ package consul import ( - "context" "fmt" "os" "sort" @@ -11,7 +10,6 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/configentry" @@ -311,11 +309,13 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) { t.Skip("too slow for testing.Short") } - _, s1 := testServerWithConfig(t) + t.Parallel() + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.DevMode = true // keep it in ram to make it 10x faster on macos + }) codec := rpcClient(t, s1) - readerCodec := rpcClient(t, s1) - writerCodec := rpcClient(t, s1) { // create one relevant entry var out bool @@ -329,59 +329,33 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) { } runStep(t, "test the errNotFound path", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var count int - - start := time.Now() - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args := structs.ConfigEntryQuery{ - Kind: structs.ServiceDefaults, - Name: "does-not-exist", - } - args.QueryOptions.MaxQueryTime = time.Second - - for ctx.Err() == nil { - var out structs.ConfigEntryResponse - - err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.Get", &args, &out) - if err != nil { - return err + rpcBlockingQueryTestHarness(t, + func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) { + args := structs.ConfigEntryQuery{ + Kind: structs.ServiceDefaults, + Name: "does-not-exist", } - t.Log("blocking query index", out.QueryMeta.Index, out.Entry) - count++ - args.QueryOptions.MinQueryIndex = out.QueryMeta.Index - } - return nil - }) - - g.Go(func() error { - for i := 0; i < 200; i++ { - time.Sleep(5 * time.Millisecond) + args.QueryOptions.MinQueryIndex = minQueryIndex + var out structs.ConfigEntryResponse + errCh := channelCallRPC(s1, "ConfigEntry.Get", &args, &out, nil) + return &out.QueryMeta, errCh + }, + func(i int) <-chan error { var out bool - err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + return channelCallRPC(s1, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ Entry: &structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, Name: fmt.Sprintf("other%d", i), }, - }, &out) - if err != nil { - return fmt.Errorf("[%d] unexpected error: %w", i, err) - } - if !out { - return fmt.Errorf("[%d] unexpectedly returned false", i) - } - } - cancel() - return nil - }) - - require.NoError(t, g.Wait()) - - assertBlockingQueryWakeupCount(t, time.Second, start, count) + }, &out, func() error { + if !out { + return fmt.Errorf("[%d] unexpectedly returned false", i) + } + return nil + }) + }, + ) }) } @@ -493,67 +467,44 @@ func TestConfigEntry_List_BlockOnNoChange(t *testing.T) { t.Skip("too slow for testing.Short") } - _, s1 := testServerWithConfig(t) + t.Parallel() + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.DevMode = true // keep it in ram to make it 10x faster on macos + }) codec := rpcClient(t, s1) - readerCodec := rpcClient(t, s1) - writerCodec := rpcClient(t, s1) run := func(t *testing.T, dataPrefix string) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + rpcBlockingQueryTestHarness(t, + func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) { + args := structs.ConfigEntryQuery{ + Kind: structs.ServiceDefaults, + Datacenter: "dc1", + } + args.QueryOptions.MinQueryIndex = minQueryIndex - var count int - - start := time.Now() - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args := structs.ConfigEntryQuery{ - Kind: structs.ServiceDefaults, - Datacenter: "dc1", - } - args.QueryOptions.MaxQueryTime = time.Second - - for ctx.Err() == nil { var out structs.IndexedConfigEntries - err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.List", &args, &out) - if err != nil { - return err - } - t.Log("blocking query index", out.QueryMeta.Index, out, time.Now()) - count++ - args.QueryOptions.MinQueryIndex = out.QueryMeta.Index - } - return nil - }) - - g.Go(func() error { - for i := 0; i < 200; i++ { - time.Sleep(5 * time.Millisecond) - + errCh := channelCallRPC(s1, "ConfigEntry.List", &args, &out, nil) + return &out.QueryMeta, errCh + }, + func(i int) <-chan error { var out bool - err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + return channelCallRPC(s1, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ Entry: &structs.ServiceResolverConfigEntry{ Kind: structs.ServiceResolver, Name: fmt.Sprintf(dataPrefix+"%d", i), ConnectTimeout: 33 * time.Second, }, - }, &out) - if err != nil { - return fmt.Errorf("[%d] unexpected error: %w", i, err) - } - if !out { - return fmt.Errorf("[%d] unexpectedly returned false", i) - } - } - cancel() - return nil - }) - - require.NoError(t, g.Wait()) - - assertBlockingQueryWakeupCount(t, time.Second, start, count) + }, &out, func() error { + if !out { + return fmt.Errorf("[%d] unexpectedly returned false", i) + } + return nil + }) + }, + ) } runStep(t, "test the errNotFound path", func(t *testing.T) { @@ -2177,68 +2128,45 @@ func TestConfigEntry_ResolveServiceConfig_BlockOnNoChange(t *testing.T) { t.Skip("too slow for testing.Short") } - _, s1 := testServerWithConfig(t) + t.Parallel() + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.DevMode = true // keep it in ram to make it 10x faster on macos + }) codec := rpcClient(t, s1) - readerCodec := rpcClient(t, s1) - writerCodec := rpcClient(t, s1) run := func(t *testing.T, dataPrefix string) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + rpcBlockingQueryTestHarness(t, + func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) { + args := structs.ServiceConfigRequest{ + Name: "foo", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + }, + } + args.QueryOptions.MinQueryIndex = minQueryIndex - var count int - - start := time.Now() - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args := structs.ServiceConfigRequest{ - Name: "foo", - UpstreamIDs: []structs.ServiceID{ - structs.NewServiceID("bar", nil), - }, - } - args.QueryOptions.MaxQueryTime = time.Second - - for ctx.Err() == nil { var out structs.ServiceConfigResponse - err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.ResolveServiceConfig", &args, &out) - if err != nil { - return err - } - t.Log("blocking query index", out.QueryMeta.Index, out, time.Now()) - count++ - args.QueryOptions.MinQueryIndex = out.QueryMeta.Index - } - return nil - }) - - g.Go(func() error { - for i := 0; i < 200; i++ { - time.Sleep(5 * time.Millisecond) - + errCh := channelCallRPC(s1, "ConfigEntry.ResolveServiceConfig", &args, &out, nil) + return &out.QueryMeta, errCh + }, + func(i int) <-chan error { var out bool - err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + return channelCallRPC(s1, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ Entry: &structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, Name: fmt.Sprintf(dataPrefix+"%d", i), }, - }, &out) - if err != nil { - return fmt.Errorf("[%d] unexpected error: %w", i, err) - } - if !out { - return fmt.Errorf("[%d] unexpectedly returned false", i) - } - } - cancel() - return nil - }) - - require.NoError(t, g.Wait()) - - assertBlockingQueryWakeupCount(t, time.Second, start, count) + }, &out, func() error { + if !out { + return fmt.Errorf("[%d] unexpectedly returned false", i) + } + return nil + }) + }, + ) } { // create one unrelated entry diff --git a/agent/consul/discovery_chain_endpoint_test.go b/agent/consul/discovery_chain_endpoint_test.go index d876940a60..174cab742b 100644 --- a/agent/consul/discovery_chain_endpoint_test.go +++ b/agent/consul/discovery_chain_endpoint_test.go @@ -1,7 +1,6 @@ package consul import ( - "context" "fmt" "os" "testing" @@ -9,7 +8,6 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" @@ -250,13 +248,14 @@ func TestDiscoveryChainEndpoint_Get_BlockOnNoChange(t *testing.T) { t.Skip("too slow for testing.Short") } + t.Parallel() + _, s1 := testServerWithConfig(t, func(c *Config) { + c.DevMode = true // keep it in ram to make it 10x faster on macos c.PrimaryDatacenter = "dc1" }) codec := rpcClient(t, s1) - readerCodec := rpcClient(t, s1) - writerCodec := rpcClient(t, s1) waitForLeaderEstablishment(t, s1) testrpc.WaitForTestAgent(t, s1.RPC, "dc1") @@ -275,66 +274,37 @@ func TestDiscoveryChainEndpoint_Get_BlockOnNoChange(t *testing.T) { } run := func(t *testing.T, dataPrefix string) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + rpcBlockingQueryTestHarness(t, + func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) { + args := &structs.DiscoveryChainRequest{ + Name: "web", + EvaluateInDatacenter: "dc1", + EvaluateInNamespace: "default", + EvaluateInPartition: "default", + Datacenter: "dc1", + } + args.QueryOptions.MinQueryIndex = minQueryIndex - var count int - - start := time.Now() - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args := &structs.DiscoveryChainRequest{ - Name: "web", - EvaluateInDatacenter: "dc1", - EvaluateInNamespace: "default", - EvaluateInPartition: "default", - Datacenter: "dc1", - } - args.QueryOptions.MaxQueryTime = time.Second - - for ctx.Err() == nil { var out structs.DiscoveryChainResponse - err := msgpackrpc.CallWithCodec(readerCodec, "DiscoveryChain.Get", &args, &out) - if err != nil { - return fmt.Errorf("error getting discovery chain: %w", err) - } - if !out.Chain.IsDefault() { - return fmt.Errorf("expected default chain") - } - - t.Log("blocking query index", out.QueryMeta.Index, out.Chain) - count++ - args.QueryOptions.MinQueryIndex = out.QueryMeta.Index - } - return nil - }) - - g.Go(func() error { - for i := 0; i < 200; i++ { - time.Sleep(5 * time.Millisecond) - + errCh := channelCallRPC(s1, "DiscoveryChain.Get", &args, &out, func() error { + if !out.Chain.IsDefault() { + return fmt.Errorf("expected default chain") + } + return nil + }) + return &out.QueryMeta, errCh + }, + func(i int) <-chan error { var out bool - err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + return channelCallRPC(s1, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ Datacenter: "dc1", Entry: &structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, Name: fmt.Sprintf(dataPrefix+"%d", i), }, - }, &out) - if err != nil { - return fmt.Errorf("[%d] unexpected error: %w", i, err) - } - if !out { - return fmt.Errorf("[%d] unexpectedly returned false", i) - } - } - cancel() - return nil - }) - - require.NoError(t, g.Wait()) - - assertBlockingQueryWakeupCount(t, time.Second, start, count) + }, &out, nil) + }, + ) } runStep(t, "test the errNotFound path", func(t *testing.T) { diff --git a/agent/consul/intention_endpoint_test.go b/agent/consul/intention_endpoint_test.go index 0e73613da5..e1a35bf62e 100644 --- a/agent/consul/intention_endpoint_test.go +++ b/agent/consul/intention_endpoint_test.go @@ -1,7 +1,6 @@ package consul import ( - "context" "fmt" "os" "testing" @@ -9,7 +8,6 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" @@ -1749,61 +1747,45 @@ func TestIntentionMatch_BlockOnNoChange(t *testing.T) { t.Skip("too slow for testing.Short") } - _, s1 := testServerWithConfig(t) + t.Parallel() + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.DevMode = true // keep it in ram to make it 10x faster on macos + }) codec := rpcClient(t, s1) - readerCodec := rpcClient(t, s1) - writerCodec := rpcClient(t, s1) waitForLeaderEstablishment(t, s1) run := func(t *testing.T, dataPrefix string, expectMatches int) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var count int - - start := time.Now() - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args := &structs.IntentionQueryRequest{ - Datacenter: "dc1", - Match: &structs.IntentionQueryMatch{ - Type: structs.IntentionMatchDestination, - Entries: []structs.IntentionMatchEntry{ - {Name: "bar"}, + rpcBlockingQueryTestHarness(t, + func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) { + args := &structs.IntentionQueryRequest{ + Datacenter: "dc1", + Match: &structs.IntentionQueryMatch{ + Type: structs.IntentionMatchDestination, + Entries: []structs.IntentionMatchEntry{ + {Name: "bar"}, + }, }, - }, - } - args.QueryOptions.MaxQueryTime = time.Second + } + args.QueryOptions.MinQueryIndex = minQueryIndex - for ctx.Err() == nil { var out structs.IndexedIntentionMatches - - err := msgpackrpc.CallWithCodec(readerCodec, "Intention.Match", args, &out) - if err != nil { - return fmt.Errorf("error getting intentions: %w", err) - } - if len(out.Matches) != 1 { - return fmt.Errorf("expected 1 match got %d", len(out.Matches)) - } - if len(out.Matches[0]) != expectMatches { - return fmt.Errorf("expected %d inner matches got %d", expectMatches, len(out.Matches[0])) - } - - t.Log("blocking query index", out.QueryMeta.Index, out.Matches[0]) - count++ - args.QueryOptions.MinQueryIndex = out.QueryMeta.Index - } - return nil - }) - - g.Go(func() error { - for i := 0; i < 200; i++ { - time.Sleep(5 * time.Millisecond) - + errCh := channelCallRPC(s1, "Intention.Match", args, &out, func() error { + if len(out.Matches) != 1 { + return fmt.Errorf("expected 1 match got %d", len(out.Matches)) + } + if len(out.Matches[0]) != expectMatches { + return fmt.Errorf("expected %d inner matches got %d", expectMatches, len(out.Matches[0])) + } + return nil + }) + return &out.QueryMeta, errCh + }, + func(i int) <-chan error { var out string - err := msgpackrpc.CallWithCodec(writerCodec, "Intention.Apply", &structs.IntentionRequest{ + return channelCallRPC(s1, "Intention.Apply", &structs.IntentionRequest{ Datacenter: "dc1", Op: structs.IntentionOpCreate, Intention: &structs.Intention{ @@ -1814,18 +1796,9 @@ func TestIntentionMatch_BlockOnNoChange(t *testing.T) { DestinationName: fmt.Sprintf(dataPrefix+"%d", i), Action: structs.IntentionActionAllow, }, - }, &out) - if err != nil { - return fmt.Errorf("[%d] unexpected error: %w", i, err) - } - } - cancel() - return nil - }) - - require.NoError(t, g.Wait()) - - assertBlockingQueryWakeupCount(t, time.Second, start, count) + }, &out, nil) + }, + ) } runStep(t, "test the errNotFound path", func(t *testing.T) { diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index c6f63e7f25..601eb7cc49 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -1,18 +1,15 @@ package consul import ( - "context" "encoding/base64" "fmt" "os" "strings" "testing" - "time" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" @@ -2326,11 +2323,13 @@ func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) { t.Skip("too slow for testing.Short") } - _, s1 := testServerWithConfig(t) + t.Parallel() + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.DevMode = true // keep it in ram to make it 10x faster on macos + }) codec := rpcClient(t, s1) - readerCodec := rpcClient(t, s1) - writerCodec := rpcClient(t, s1) waitForLeaderEstablishment(t, s1) @@ -2352,45 +2351,26 @@ func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) { } run := func(t *testing.T, dataPrefix string, expectServices int) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + rpcBlockingQueryTestHarness(t, + func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) { + args := &structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + } + args.QueryOptions.MinQueryIndex = minQueryIndex - var count int - - start := time.Now() - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args := &structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "web", - } - args.QueryOptions.MaxQueryTime = time.Second - - for ctx.Err() == nil { var out structs.IndexedServiceList - - err := msgpackrpc.CallWithCodec(readerCodec, "Internal.IntentionUpstreams", args, &out) - if err != nil { - return fmt.Errorf("error getting upstreams: %w", err) - } - - if len(out.Services) != expectServices { - return fmt.Errorf("expected %d services got %d", expectServices, len(out.Services)) - } - - t.Log("blocking query index", out.QueryMeta.Index, out.Services) - count++ - args.QueryOptions.MinQueryIndex = out.QueryMeta.Index - } - return nil - }) - - g.Go(func() error { - for i := 0; i < 200; i++ { - time.Sleep(5 * time.Millisecond) - + errCh := channelCallRPC(s1, "Internal.IntentionUpstreams", args, &out, func() error { + if len(out.Services) != expectServices { + return fmt.Errorf("expected %d services got %d", expectServices, len(out.Services)) + } + return nil + }) + return &out.QueryMeta, errCh + }, + func(i int) <-chan error { var out string - err := msgpackrpc.CallWithCodec(writerCodec, "Intention.Apply", &structs.IntentionRequest{ + return channelCallRPC(s1, "Intention.Apply", &structs.IntentionRequest{ Datacenter: "dc1", Op: structs.IntentionOpCreate, Intention: &structs.Intention{ @@ -2398,18 +2378,9 @@ func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) { DestinationName: fmt.Sprintf(dataPrefix+"-dst-%d", i), Action: structs.IntentionActionAllow, }, - }, &out) - if err != nil { - return fmt.Errorf("[%d] unexpected error: %w", i, err) - } - } - cancel() - return nil - }) - - require.NoError(t, g.Wait()) - - assertBlockingQueryWakeupCount(t, time.Second, start, count) + }, &out, nil) + }, + ) } runStep(t, "test the errNotFound path", func(t *testing.T) { diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index d26b83faa8..2c46ca8f27 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -1682,23 +1682,103 @@ func getFirstSubscribeEventOrError(conn *grpc.ClientConn, req *pbsubscribe.Subsc return event, nil } -// assertBlockingQueryWakeupCount is used to assist in assertions for -// blockingQuery RPC tests involving the two sentinel errors errNotFound and -// errNotChanged. -// -// Those tests are a bit racy because of the timing of the two goroutines, so -// we relax the check for the count to be within a small range. -// -// The blocking query is going to wake up every interval, so use the elapsed test -// time with that known timing value to gauge how many legit wakeups should -// happen and then pad it out a smidge. -func assertBlockingQueryWakeupCount(t testing.TB, interval time.Duration, start time.Time, gotCount int) { +// channelCallRPC lets you execute an RPC async. Helpful in some +// tests. +func channelCallRPC( + srv *Server, + method string, + args interface{}, + resp interface{}, + responseInterceptor func() error, +) <-chan error { + errCh := make(chan error, 1) + go func() { + codec, err := rpcClientNoClose(srv) + if err != nil { + errCh <- err + return + } + defer codec.Close() + + err = msgpackrpc.CallWithCodec(codec, method, args, resp) + if err == nil && responseInterceptor != nil { + err = responseInterceptor() + } + errCh <- err + }() + return errCh +} + +// rpcBlockingQueryTestHarness is specifically meant to test the +// errNotFound and errNotChanged mechanisms in blockingQuery() +func rpcBlockingQueryTestHarness( + t *testing.T, + readQueryFn func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error), + noisyWriteFn func(i int) <-chan error, +) { t.Helper() - const buffer = 2 - expectedQueries := int(time.Since(start)/interval) + buffer + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - if gotCount < 2 || gotCount > expectedQueries { - t.Fatalf("expected count to be >= 2 or < %d, got %d", expectedQueries, gotCount) + launchWriters := func() { + defer cancel() + + for i := 0; i < 200; i++ { + time.Sleep(5 * time.Millisecond) + + errCh := noisyWriteFn(i) + select { + case <-ctx.Done(): + return + case err := <-errCh: + if err != nil { + t.Errorf("[%d] unexpected error: %w", i, err) + return + } + } + } } + + var ( + count int + minQueryIndex uint64 + ) + + for ctx.Err() == nil { + // The first iteration is an orientation iteration, as we don't pass an + // index value so there is no actual blocking that will happen. + // + // Since the data is not changing, we don't expect the second iteration + // to return soon, so we wait a bit after kicking it off before + // launching the write-storm. + var timerCh <-chan time.Time + if count == 1 { + timerCh = time.After(50 * time.Millisecond) + } + + qm, errCh := readQueryFn(minQueryIndex) + + RESUME: + select { + case err := <-errCh: + if err != nil { + require.NoError(t, err) + } + + t.Log("blocking query index", qm.Index) + count++ + minQueryIndex = qm.Index + + case <-timerCh: + timerCh = nil + go launchWriters() + goto RESUME + + case <-ctx.Done(): + break + } + } + + require.Equal(t, 1, count, "if this fails, then the timer likely needs to be increased above") } diff --git a/agent/consul/status_endpoint_test.go b/agent/consul/status_endpoint_test.go index 668fe85d17..6ae7111e85 100644 --- a/agent/consul/status_endpoint_test.go +++ b/agent/consul/status_endpoint_test.go @@ -17,17 +17,25 @@ import ( ) func rpcClient(t *testing.T, s *Server) rpc.ClientCodec { + codec, err := rpcClientNoClose(s) + if err != nil { + t.Fatalf("err: %v", err) + } + t.Cleanup(func() { codec.Close() }) + return codec +} + +func rpcClientNoClose(s *Server) (rpc.ClientCodec, error) { addr := s.config.RPCAdvertise conn, err := net.DialTimeout("tcp", addr.String(), time.Second) if err != nil { - t.Fatalf("err: %v", err) + return nil, err } // Write the Consul RPC byte to set the mode conn.Write([]byte{byte(pool.RPCConsul)}) codec := msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle) - t.Cleanup(func() { codec.Close() }) - return codec + return codec, nil } func insecureRPCClient(s *Server, c tlsutil.Config) (rpc.ClientCodec, error) {