diff --git a/agent/consul/config_endpoint_test.go b/agent/consul/config_endpoint_test.go index 414886b87e..b5778b8234 100644 --- a/agent/consul/config_endpoint_test.go +++ b/agent/consul/config_endpoint_test.go @@ -1,7 +1,6 @@ package consul import ( - "context" "fmt" "os" "sort" @@ -11,7 +10,6 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" hashstructure_v2 "github.com/mitchellh/hashstructure/v2" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/configentry" @@ -311,11 +309,13 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) { t.Skip("too slow for testing.Short") } - _, s1 := testServerWithConfig(t) + t.Parallel() + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.DevMode = true // keep it in ram to make it 10x faster on macos + }) codec := rpcClient(t, s1) - readerCodec := rpcClient(t, s1) - writerCodec := rpcClient(t, s1) { // create one relevant entry var out bool @@ -329,59 +329,33 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) { } runStep(t, "test the errNotFound path", func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var count int - - start := time.Now() - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args := structs.ConfigEntryQuery{ - Kind: structs.ServiceDefaults, - Name: "does-not-exist", - } - args.QueryOptions.MaxQueryTime = time.Second - - for ctx.Err() == nil { - var out structs.ConfigEntryResponse - - err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.Get", &args, &out) - if err != nil { - return err + rpcBlockingQueryTestHarness(t, + func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) { + args := structs.ConfigEntryQuery{ + Kind: structs.ServiceDefaults, + Name: "does-not-exist", } - t.Log("blocking query index", out.QueryMeta.Index, out.Entry) - count++ - args.QueryOptions.MinQueryIndex = out.QueryMeta.Index - } - return nil - }) - - g.Go(func() error { - for i := 0; i < 200; i++ { - time.Sleep(5 * time.Millisecond) + args.QueryOptions.MinQueryIndex = minQueryIndex + var out structs.ConfigEntryResponse + errCh := channelCallRPC(s1, "ConfigEntry.Get", &args, &out, nil) + return &out.QueryMeta, errCh + }, + func(i int) <-chan error { var out bool - err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + return channelCallRPC(s1, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ Entry: &structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, Name: fmt.Sprintf("other%d", i), }, - }, &out) - if err != nil { - return fmt.Errorf("[%d] unexpected error: %w", i, err) - } - if !out { - return fmt.Errorf("[%d] unexpectedly returned false", i) - } - } - cancel() - return nil - }) - - require.NoError(t, g.Wait()) - - assertBlockingQueryWakeupCount(t, time.Second, start, count) + }, &out, func() error { + if !out { + return fmt.Errorf("[%d] unexpectedly returned false", i) + } + return nil + }) + }, + ) }) } @@ -493,67 +467,44 @@ func TestConfigEntry_List_BlockOnNoChange(t *testing.T) { t.Skip("too slow for testing.Short") } - _, s1 := testServerWithConfig(t) + t.Parallel() + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.DevMode = true // keep it in ram to make it 10x faster on macos + }) codec := rpcClient(t, s1) - readerCodec := rpcClient(t, s1) - writerCodec := rpcClient(t, s1) run := func(t *testing.T, dataPrefix string) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + rpcBlockingQueryTestHarness(t, + func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) { + args := structs.ConfigEntryQuery{ + Kind: structs.ServiceDefaults, + Datacenter: "dc1", + } + args.QueryOptions.MinQueryIndex = minQueryIndex - var count int - - start := time.Now() - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args := structs.ConfigEntryQuery{ - Kind: structs.ServiceDefaults, - Datacenter: "dc1", - } - args.QueryOptions.MaxQueryTime = time.Second - - for ctx.Err() == nil { var out structs.IndexedConfigEntries - err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.List", &args, &out) - if err != nil { - return err - } - t.Log("blocking query index", out.QueryMeta.Index, out, time.Now()) - count++ - args.QueryOptions.MinQueryIndex = out.QueryMeta.Index - } - return nil - }) - - g.Go(func() error { - for i := 0; i < 200; i++ { - time.Sleep(5 * time.Millisecond) - + errCh := channelCallRPC(s1, "ConfigEntry.List", &args, &out, nil) + return &out.QueryMeta, errCh + }, + func(i int) <-chan error { var out bool - err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + return channelCallRPC(s1, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ Entry: &structs.ServiceResolverConfigEntry{ Kind: structs.ServiceResolver, Name: fmt.Sprintf(dataPrefix+"%d", i), ConnectTimeout: 33 * time.Second, }, - }, &out) - if err != nil { - return fmt.Errorf("[%d] unexpected error: %w", i, err) - } - if !out { - return fmt.Errorf("[%d] unexpectedly returned false", i) - } - } - cancel() - return nil - }) - - require.NoError(t, g.Wait()) - - assertBlockingQueryWakeupCount(t, time.Second, start, count) + }, &out, func() error { + if !out { + return fmt.Errorf("[%d] unexpectedly returned false", i) + } + return nil + }) + }, + ) } runStep(t, "test the errNotFound path", func(t *testing.T) { @@ -2177,68 +2128,45 @@ func TestConfigEntry_ResolveServiceConfig_BlockOnNoChange(t *testing.T) { t.Skip("too slow for testing.Short") } - _, s1 := testServerWithConfig(t) + t.Parallel() + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.DevMode = true // keep it in ram to make it 10x faster on macos + }) codec := rpcClient(t, s1) - readerCodec := rpcClient(t, s1) - writerCodec := rpcClient(t, s1) run := func(t *testing.T, dataPrefix string) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + rpcBlockingQueryTestHarness(t, + func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) { + args := structs.ServiceConfigRequest{ + Name: "foo", + UpstreamIDs: []structs.ServiceID{ + structs.NewServiceID("bar", nil), + }, + } + args.QueryOptions.MinQueryIndex = minQueryIndex - var count int - - start := time.Now() - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args := structs.ServiceConfigRequest{ - Name: "foo", - UpstreamIDs: []structs.ServiceID{ - structs.NewServiceID("bar", nil), - }, - } - args.QueryOptions.MaxQueryTime = time.Second - - for ctx.Err() == nil { var out structs.ServiceConfigResponse - err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.ResolveServiceConfig", &args, &out) - if err != nil { - return err - } - t.Log("blocking query index", out.QueryMeta.Index, out, time.Now()) - count++ - args.QueryOptions.MinQueryIndex = out.QueryMeta.Index - } - return nil - }) - - g.Go(func() error { - for i := 0; i < 200; i++ { - time.Sleep(5 * time.Millisecond) - + errCh := channelCallRPC(s1, "ConfigEntry.ResolveServiceConfig", &args, &out, nil) + return &out.QueryMeta, errCh + }, + func(i int) <-chan error { var out bool - err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + return channelCallRPC(s1, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ Entry: &structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, Name: fmt.Sprintf(dataPrefix+"%d", i), }, - }, &out) - if err != nil { - return fmt.Errorf("[%d] unexpected error: %w", i, err) - } - if !out { - return fmt.Errorf("[%d] unexpectedly returned false", i) - } - } - cancel() - return nil - }) - - require.NoError(t, g.Wait()) - - assertBlockingQueryWakeupCount(t, time.Second, start, count) + }, &out, func() error { + if !out { + return fmt.Errorf("[%d] unexpectedly returned false", i) + } + return nil + }) + }, + ) } { // create one unrelated entry diff --git a/agent/consul/discovery_chain_endpoint_test.go b/agent/consul/discovery_chain_endpoint_test.go index d876940a60..174cab742b 100644 --- a/agent/consul/discovery_chain_endpoint_test.go +++ b/agent/consul/discovery_chain_endpoint_test.go @@ -1,7 +1,6 @@ package consul import ( - "context" "fmt" "os" "testing" @@ -9,7 +8,6 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" @@ -250,13 +248,14 @@ func TestDiscoveryChainEndpoint_Get_BlockOnNoChange(t *testing.T) { t.Skip("too slow for testing.Short") } + t.Parallel() + _, s1 := testServerWithConfig(t, func(c *Config) { + c.DevMode = true // keep it in ram to make it 10x faster on macos c.PrimaryDatacenter = "dc1" }) codec := rpcClient(t, s1) - readerCodec := rpcClient(t, s1) - writerCodec := rpcClient(t, s1) waitForLeaderEstablishment(t, s1) testrpc.WaitForTestAgent(t, s1.RPC, "dc1") @@ -275,66 +274,37 @@ func TestDiscoveryChainEndpoint_Get_BlockOnNoChange(t *testing.T) { } run := func(t *testing.T, dataPrefix string) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + rpcBlockingQueryTestHarness(t, + func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) { + args := &structs.DiscoveryChainRequest{ + Name: "web", + EvaluateInDatacenter: "dc1", + EvaluateInNamespace: "default", + EvaluateInPartition: "default", + Datacenter: "dc1", + } + args.QueryOptions.MinQueryIndex = minQueryIndex - var count int - - start := time.Now() - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args := &structs.DiscoveryChainRequest{ - Name: "web", - EvaluateInDatacenter: "dc1", - EvaluateInNamespace: "default", - EvaluateInPartition: "default", - Datacenter: "dc1", - } - args.QueryOptions.MaxQueryTime = time.Second - - for ctx.Err() == nil { var out structs.DiscoveryChainResponse - err := msgpackrpc.CallWithCodec(readerCodec, "DiscoveryChain.Get", &args, &out) - if err != nil { - return fmt.Errorf("error getting discovery chain: %w", err) - } - if !out.Chain.IsDefault() { - return fmt.Errorf("expected default chain") - } - - t.Log("blocking query index", out.QueryMeta.Index, out.Chain) - count++ - args.QueryOptions.MinQueryIndex = out.QueryMeta.Index - } - return nil - }) - - g.Go(func() error { - for i := 0; i < 200; i++ { - time.Sleep(5 * time.Millisecond) - + errCh := channelCallRPC(s1, "DiscoveryChain.Get", &args, &out, func() error { + if !out.Chain.IsDefault() { + return fmt.Errorf("expected default chain") + } + return nil + }) + return &out.QueryMeta, errCh + }, + func(i int) <-chan error { var out bool - err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ + return channelCallRPC(s1, "ConfigEntry.Apply", &structs.ConfigEntryRequest{ Datacenter: "dc1", Entry: &structs.ServiceConfigEntry{ Kind: structs.ServiceDefaults, Name: fmt.Sprintf(dataPrefix+"%d", i), }, - }, &out) - if err != nil { - return fmt.Errorf("[%d] unexpected error: %w", i, err) - } - if !out { - return fmt.Errorf("[%d] unexpectedly returned false", i) - } - } - cancel() - return nil - }) - - require.NoError(t, g.Wait()) - - assertBlockingQueryWakeupCount(t, time.Second, start, count) + }, &out, nil) + }, + ) } runStep(t, "test the errNotFound path", func(t *testing.T) { diff --git a/agent/consul/intention_endpoint_test.go b/agent/consul/intention_endpoint_test.go index 0e73613da5..e1a35bf62e 100644 --- a/agent/consul/intention_endpoint_test.go +++ b/agent/consul/intention_endpoint_test.go @@ -1,7 +1,6 @@ package consul import ( - "context" "fmt" "os" "testing" @@ -9,7 +8,6 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" @@ -1749,61 +1747,45 @@ func TestIntentionMatch_BlockOnNoChange(t *testing.T) { t.Skip("too slow for testing.Short") } - _, s1 := testServerWithConfig(t) + t.Parallel() + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.DevMode = true // keep it in ram to make it 10x faster on macos + }) codec := rpcClient(t, s1) - readerCodec := rpcClient(t, s1) - writerCodec := rpcClient(t, s1) waitForLeaderEstablishment(t, s1) run := func(t *testing.T, dataPrefix string, expectMatches int) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - var count int - - start := time.Now() - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args := &structs.IntentionQueryRequest{ - Datacenter: "dc1", - Match: &structs.IntentionQueryMatch{ - Type: structs.IntentionMatchDestination, - Entries: []structs.IntentionMatchEntry{ - {Name: "bar"}, + rpcBlockingQueryTestHarness(t, + func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) { + args := &structs.IntentionQueryRequest{ + Datacenter: "dc1", + Match: &structs.IntentionQueryMatch{ + Type: structs.IntentionMatchDestination, + Entries: []structs.IntentionMatchEntry{ + {Name: "bar"}, + }, }, - }, - } - args.QueryOptions.MaxQueryTime = time.Second + } + args.QueryOptions.MinQueryIndex = minQueryIndex - for ctx.Err() == nil { var out structs.IndexedIntentionMatches - - err := msgpackrpc.CallWithCodec(readerCodec, "Intention.Match", args, &out) - if err != nil { - return fmt.Errorf("error getting intentions: %w", err) - } - if len(out.Matches) != 1 { - return fmt.Errorf("expected 1 match got %d", len(out.Matches)) - } - if len(out.Matches[0]) != expectMatches { - return fmt.Errorf("expected %d inner matches got %d", expectMatches, len(out.Matches[0])) - } - - t.Log("blocking query index", out.QueryMeta.Index, out.Matches[0]) - count++ - args.QueryOptions.MinQueryIndex = out.QueryMeta.Index - } - return nil - }) - - g.Go(func() error { - for i := 0; i < 200; i++ { - time.Sleep(5 * time.Millisecond) - + errCh := channelCallRPC(s1, "Intention.Match", args, &out, func() error { + if len(out.Matches) != 1 { + return fmt.Errorf("expected 1 match got %d", len(out.Matches)) + } + if len(out.Matches[0]) != expectMatches { + return fmt.Errorf("expected %d inner matches got %d", expectMatches, len(out.Matches[0])) + } + return nil + }) + return &out.QueryMeta, errCh + }, + func(i int) <-chan error { var out string - err := msgpackrpc.CallWithCodec(writerCodec, "Intention.Apply", &structs.IntentionRequest{ + return channelCallRPC(s1, "Intention.Apply", &structs.IntentionRequest{ Datacenter: "dc1", Op: structs.IntentionOpCreate, Intention: &structs.Intention{ @@ -1814,18 +1796,9 @@ func TestIntentionMatch_BlockOnNoChange(t *testing.T) { DestinationName: fmt.Sprintf(dataPrefix+"%d", i), Action: structs.IntentionActionAllow, }, - }, &out) - if err != nil { - return fmt.Errorf("[%d] unexpected error: %w", i, err) - } - } - cancel() - return nil - }) - - require.NoError(t, g.Wait()) - - assertBlockingQueryWakeupCount(t, time.Second, start, count) + }, &out, nil) + }, + ) } runStep(t, "test the errNotFound path", func(t *testing.T) { diff --git a/agent/consul/internal_endpoint_test.go b/agent/consul/internal_endpoint_test.go index c6f63e7f25..601eb7cc49 100644 --- a/agent/consul/internal_endpoint_test.go +++ b/agent/consul/internal_endpoint_test.go @@ -1,18 +1,15 @@ package consul import ( - "context" "encoding/base64" "fmt" "os" "strings" "testing" - "time" msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" @@ -2326,11 +2323,13 @@ func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) { t.Skip("too slow for testing.Short") } - _, s1 := testServerWithConfig(t) + t.Parallel() + + _, s1 := testServerWithConfig(t, func(c *Config) { + c.DevMode = true // keep it in ram to make it 10x faster on macos + }) codec := rpcClient(t, s1) - readerCodec := rpcClient(t, s1) - writerCodec := rpcClient(t, s1) waitForLeaderEstablishment(t, s1) @@ -2352,45 +2351,26 @@ func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) { } run := func(t *testing.T, dataPrefix string, expectServices int) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + rpcBlockingQueryTestHarness(t, + func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) { + args := &structs.ServiceSpecificRequest{ + Datacenter: "dc1", + ServiceName: "web", + } + args.QueryOptions.MinQueryIndex = minQueryIndex - var count int - - start := time.Now() - g, ctx := errgroup.WithContext(ctx) - g.Go(func() error { - args := &structs.ServiceSpecificRequest{ - Datacenter: "dc1", - ServiceName: "web", - } - args.QueryOptions.MaxQueryTime = time.Second - - for ctx.Err() == nil { var out structs.IndexedServiceList - - err := msgpackrpc.CallWithCodec(readerCodec, "Internal.IntentionUpstreams", args, &out) - if err != nil { - return fmt.Errorf("error getting upstreams: %w", err) - } - - if len(out.Services) != expectServices { - return fmt.Errorf("expected %d services got %d", expectServices, len(out.Services)) - } - - t.Log("blocking query index", out.QueryMeta.Index, out.Services) - count++ - args.QueryOptions.MinQueryIndex = out.QueryMeta.Index - } - return nil - }) - - g.Go(func() error { - for i := 0; i < 200; i++ { - time.Sleep(5 * time.Millisecond) - + errCh := channelCallRPC(s1, "Internal.IntentionUpstreams", args, &out, func() error { + if len(out.Services) != expectServices { + return fmt.Errorf("expected %d services got %d", expectServices, len(out.Services)) + } + return nil + }) + return &out.QueryMeta, errCh + }, + func(i int) <-chan error { var out string - err := msgpackrpc.CallWithCodec(writerCodec, "Intention.Apply", &structs.IntentionRequest{ + return channelCallRPC(s1, "Intention.Apply", &structs.IntentionRequest{ Datacenter: "dc1", Op: structs.IntentionOpCreate, Intention: &structs.Intention{ @@ -2398,18 +2378,9 @@ func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) { DestinationName: fmt.Sprintf(dataPrefix+"-dst-%d", i), Action: structs.IntentionActionAllow, }, - }, &out) - if err != nil { - return fmt.Errorf("[%d] unexpected error: %w", i, err) - } - } - cancel() - return nil - }) - - require.NoError(t, g.Wait()) - - assertBlockingQueryWakeupCount(t, time.Second, start, count) + }, &out, nil) + }, + ) } runStep(t, "test the errNotFound path", func(t *testing.T) { diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index d26b83faa8..2c46ca8f27 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -1682,23 +1682,103 @@ func getFirstSubscribeEventOrError(conn *grpc.ClientConn, req *pbsubscribe.Subsc return event, nil } -// assertBlockingQueryWakeupCount is used to assist in assertions for -// blockingQuery RPC tests involving the two sentinel errors errNotFound and -// errNotChanged. -// -// Those tests are a bit racy because of the timing of the two goroutines, so -// we relax the check for the count to be within a small range. -// -// The blocking query is going to wake up every interval, so use the elapsed test -// time with that known timing value to gauge how many legit wakeups should -// happen and then pad it out a smidge. -func assertBlockingQueryWakeupCount(t testing.TB, interval time.Duration, start time.Time, gotCount int) { +// channelCallRPC lets you execute an RPC async. Helpful in some +// tests. +func channelCallRPC( + srv *Server, + method string, + args interface{}, + resp interface{}, + responseInterceptor func() error, +) <-chan error { + errCh := make(chan error, 1) + go func() { + codec, err := rpcClientNoClose(srv) + if err != nil { + errCh <- err + return + } + defer codec.Close() + + err = msgpackrpc.CallWithCodec(codec, method, args, resp) + if err == nil && responseInterceptor != nil { + err = responseInterceptor() + } + errCh <- err + }() + return errCh +} + +// rpcBlockingQueryTestHarness is specifically meant to test the +// errNotFound and errNotChanged mechanisms in blockingQuery() +func rpcBlockingQueryTestHarness( + t *testing.T, + readQueryFn func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error), + noisyWriteFn func(i int) <-chan error, +) { t.Helper() - const buffer = 2 - expectedQueries := int(time.Since(start)/interval) + buffer + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - if gotCount < 2 || gotCount > expectedQueries { - t.Fatalf("expected count to be >= 2 or < %d, got %d", expectedQueries, gotCount) + launchWriters := func() { + defer cancel() + + for i := 0; i < 200; i++ { + time.Sleep(5 * time.Millisecond) + + errCh := noisyWriteFn(i) + select { + case <-ctx.Done(): + return + case err := <-errCh: + if err != nil { + t.Errorf("[%d] unexpected error: %w", i, err) + return + } + } + } } + + var ( + count int + minQueryIndex uint64 + ) + + for ctx.Err() == nil { + // The first iteration is an orientation iteration, as we don't pass an + // index value so there is no actual blocking that will happen. + // + // Since the data is not changing, we don't expect the second iteration + // to return soon, so we wait a bit after kicking it off before + // launching the write-storm. + var timerCh <-chan time.Time + if count == 1 { + timerCh = time.After(50 * time.Millisecond) + } + + qm, errCh := readQueryFn(minQueryIndex) + + RESUME: + select { + case err := <-errCh: + if err != nil { + require.NoError(t, err) + } + + t.Log("blocking query index", qm.Index) + count++ + minQueryIndex = qm.Index + + case <-timerCh: + timerCh = nil + go launchWriters() + goto RESUME + + case <-ctx.Done(): + break + } + } + + require.Equal(t, 1, count, "if this fails, then the timer likely needs to be increased above") } diff --git a/agent/consul/status_endpoint_test.go b/agent/consul/status_endpoint_test.go index 668fe85d17..6ae7111e85 100644 --- a/agent/consul/status_endpoint_test.go +++ b/agent/consul/status_endpoint_test.go @@ -17,17 +17,25 @@ import ( ) func rpcClient(t *testing.T, s *Server) rpc.ClientCodec { + codec, err := rpcClientNoClose(s) + if err != nil { + t.Fatalf("err: %v", err) + } + t.Cleanup(func() { codec.Close() }) + return codec +} + +func rpcClientNoClose(s *Server) (rpc.ClientCodec, error) { addr := s.config.RPCAdvertise conn, err := net.DialTimeout("tcp", addr.String(), time.Second) if err != nil { - t.Fatalf("err: %v", err) + return nil, err } // Write the Consul RPC byte to set the mode conn.Write([]byte{byte(pool.RPCConsul)}) codec := msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle) - t.Cleanup(func() { codec.Close() }) - return codec + return codec, nil } func insecureRPCClient(s *Server, c tlsutil.Config) (rpc.ClientCodec, error) {