reduce flakiness/raciness of errNotFound and errNotChanged blocking query tests (#12518)

Improves tests from #12362

These tests try to setup the following concurrent scenario:

1. (goroutine 1) execute read RPC with index=0
2. (goroutine 1) get response from (1) @ index=10
3. (goroutine 1) execute read RPC with index=10 and block
4. (goroutine 2) WHILE (3) is blocking, start slamming the system with stray writes that will cause the WatchSet to wakeup
5. (goroutine 2) after doing all writes, shut down the reader above
6. (goroutine 1) stops reading, double checks that it only ever woke up once (from 1)
This commit is contained in:
R.B. Boyer 2022-03-04 11:20:01 -06:00 committed by GitHub
parent 9a0c2dee60
commit 8307e40f2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 266 additions and 336 deletions

View File

@ -1,7 +1,6 @@
package consul
import (
"context"
"fmt"
"os"
"sort"
@ -11,7 +10,6 @@ import (
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
@ -311,11 +309,13 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) {
t.Skip("too slow for testing.Short")
}
_, s1 := testServerWithConfig(t)
t.Parallel()
_, s1 := testServerWithConfig(t, func(c *Config) {
c.DevMode = true // keep it in ram to make it 10x faster on macos
})
codec := rpcClient(t, s1)
readerCodec := rpcClient(t, s1)
writerCodec := rpcClient(t, s1)
{ // create one relevant entry
var out bool
@ -329,59 +329,33 @@ func TestConfigEntry_Get_BlockOnNonExistent(t *testing.T) {
}
runStep(t, "test the errNotFound path", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var count int
start := time.Now()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
rpcBlockingQueryTestHarness(t,
func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) {
args := structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
Name: "does-not-exist",
}
args.QueryOptions.MaxQueryTime = time.Second
args.QueryOptions.MinQueryIndex = minQueryIndex
for ctx.Err() == nil {
var out structs.ConfigEntryResponse
err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.Get", &args, &out)
if err != nil {
return err
}
t.Log("blocking query index", out.QueryMeta.Index, out.Entry)
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})
g.Go(func() error {
for i := 0; i < 200; i++ {
time.Sleep(5 * time.Millisecond)
errCh := channelCallRPC(s1, "ConfigEntry.Get", &args, &out, nil)
return &out.QueryMeta, errCh
},
func(i int) <-chan error {
var out bool
err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
return channelCallRPC(s1, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: fmt.Sprintf("other%d", i),
},
}, &out)
if err != nil {
return fmt.Errorf("[%d] unexpected error: %w", i, err)
}
}, &out, func() error {
if !out {
return fmt.Errorf("[%d] unexpectedly returned false", i)
}
}
cancel()
return nil
})
require.NoError(t, g.Wait())
assertBlockingQueryWakeupCount(t, time.Second, start, count)
},
)
})
}
@ -493,67 +467,44 @@ func TestConfigEntry_List_BlockOnNoChange(t *testing.T) {
t.Skip("too slow for testing.Short")
}
_, s1 := testServerWithConfig(t)
t.Parallel()
_, s1 := testServerWithConfig(t, func(c *Config) {
c.DevMode = true // keep it in ram to make it 10x faster on macos
})
codec := rpcClient(t, s1)
readerCodec := rpcClient(t, s1)
writerCodec := rpcClient(t, s1)
run := func(t *testing.T, dataPrefix string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var count int
start := time.Now()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
rpcBlockingQueryTestHarness(t,
func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) {
args := structs.ConfigEntryQuery{
Kind: structs.ServiceDefaults,
Datacenter: "dc1",
}
args.QueryOptions.MaxQueryTime = time.Second
args.QueryOptions.MinQueryIndex = minQueryIndex
for ctx.Err() == nil {
var out structs.IndexedConfigEntries
err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.List", &args, &out)
if err != nil {
return err
}
t.Log("blocking query index", out.QueryMeta.Index, out, time.Now())
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})
g.Go(func() error {
for i := 0; i < 200; i++ {
time.Sleep(5 * time.Millisecond)
errCh := channelCallRPC(s1, "ConfigEntry.List", &args, &out, nil)
return &out.QueryMeta, errCh
},
func(i int) <-chan error {
var out bool
err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
return channelCallRPC(s1, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Entry: &structs.ServiceResolverConfigEntry{
Kind: structs.ServiceResolver,
Name: fmt.Sprintf(dataPrefix+"%d", i),
ConnectTimeout: 33 * time.Second,
},
}, &out)
if err != nil {
return fmt.Errorf("[%d] unexpected error: %w", i, err)
}
}, &out, func() error {
if !out {
return fmt.Errorf("[%d] unexpectedly returned false", i)
}
}
cancel()
return nil
})
require.NoError(t, g.Wait())
assertBlockingQueryWakeupCount(t, time.Second, start, count)
},
)
}
runStep(t, "test the errNotFound path", func(t *testing.T) {
@ -2177,68 +2128,45 @@ func TestConfigEntry_ResolveServiceConfig_BlockOnNoChange(t *testing.T) {
t.Skip("too slow for testing.Short")
}
_, s1 := testServerWithConfig(t)
t.Parallel()
_, s1 := testServerWithConfig(t, func(c *Config) {
c.DevMode = true // keep it in ram to make it 10x faster on macos
})
codec := rpcClient(t, s1)
readerCodec := rpcClient(t, s1)
writerCodec := rpcClient(t, s1)
run := func(t *testing.T, dataPrefix string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var count int
start := time.Now()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
rpcBlockingQueryTestHarness(t,
func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) {
args := structs.ServiceConfigRequest{
Name: "foo",
UpstreamIDs: []structs.ServiceID{
structs.NewServiceID("bar", nil),
},
}
args.QueryOptions.MaxQueryTime = time.Second
args.QueryOptions.MinQueryIndex = minQueryIndex
for ctx.Err() == nil {
var out structs.ServiceConfigResponse
err := msgpackrpc.CallWithCodec(readerCodec, "ConfigEntry.ResolveServiceConfig", &args, &out)
if err != nil {
return err
}
t.Log("blocking query index", out.QueryMeta.Index, out, time.Now())
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})
g.Go(func() error {
for i := 0; i < 200; i++ {
time.Sleep(5 * time.Millisecond)
errCh := channelCallRPC(s1, "ConfigEntry.ResolveServiceConfig", &args, &out, nil)
return &out.QueryMeta, errCh
},
func(i int) <-chan error {
var out bool
err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
return channelCallRPC(s1, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: fmt.Sprintf(dataPrefix+"%d", i),
},
}, &out)
if err != nil {
return fmt.Errorf("[%d] unexpected error: %w", i, err)
}
}, &out, func() error {
if !out {
return fmt.Errorf("[%d] unexpectedly returned false", i)
}
}
cancel()
return nil
})
require.NoError(t, g.Wait())
assertBlockingQueryWakeupCount(t, time.Second, start, count)
},
)
}
{ // create one unrelated entry

View File

@ -1,7 +1,6 @@
package consul
import (
"context"
"fmt"
"os"
"testing"
@ -9,7 +8,6 @@ import (
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/connect"
@ -250,13 +248,14 @@ func TestDiscoveryChainEndpoint_Get_BlockOnNoChange(t *testing.T) {
t.Skip("too slow for testing.Short")
}
t.Parallel()
_, s1 := testServerWithConfig(t, func(c *Config) {
c.DevMode = true // keep it in ram to make it 10x faster on macos
c.PrimaryDatacenter = "dc1"
})
codec := rpcClient(t, s1)
readerCodec := rpcClient(t, s1)
writerCodec := rpcClient(t, s1)
waitForLeaderEstablishment(t, s1)
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
@ -275,14 +274,8 @@ func TestDiscoveryChainEndpoint_Get_BlockOnNoChange(t *testing.T) {
}
run := func(t *testing.T, dataPrefix string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var count int
start := time.Now()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
rpcBlockingQueryTestHarness(t,
func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) {
args := &structs.DiscoveryChainRequest{
Name: "web",
EvaluateInDatacenter: "dc1",
@ -290,51 +283,28 @@ func TestDiscoveryChainEndpoint_Get_BlockOnNoChange(t *testing.T) {
EvaluateInPartition: "default",
Datacenter: "dc1",
}
args.QueryOptions.MaxQueryTime = time.Second
args.QueryOptions.MinQueryIndex = minQueryIndex
for ctx.Err() == nil {
var out structs.DiscoveryChainResponse
err := msgpackrpc.CallWithCodec(readerCodec, "DiscoveryChain.Get", &args, &out)
if err != nil {
return fmt.Errorf("error getting discovery chain: %w", err)
}
errCh := channelCallRPC(s1, "DiscoveryChain.Get", &args, &out, func() error {
if !out.Chain.IsDefault() {
return fmt.Errorf("expected default chain")
}
t.Log("blocking query index", out.QueryMeta.Index, out.Chain)
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})
g.Go(func() error {
for i := 0; i < 200; i++ {
time.Sleep(5 * time.Millisecond)
return &out.QueryMeta, errCh
},
func(i int) <-chan error {
var out bool
err := msgpackrpc.CallWithCodec(writerCodec, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
return channelCallRPC(s1, "ConfigEntry.Apply", &structs.ConfigEntryRequest{
Datacenter: "dc1",
Entry: &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: fmt.Sprintf(dataPrefix+"%d", i),
},
}, &out)
if err != nil {
return fmt.Errorf("[%d] unexpected error: %w", i, err)
}
if !out {
return fmt.Errorf("[%d] unexpectedly returned false", i)
}
}
cancel()
return nil
})
require.NoError(t, g.Wait())
assertBlockingQueryWakeupCount(t, time.Second, start, count)
}, &out, nil)
},
)
}
runStep(t, "test the errNotFound path", func(t *testing.T) {

View File

@ -1,7 +1,6 @@
package consul
import (
"context"
"fmt"
"os"
"testing"
@ -9,7 +8,6 @@ import (
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
@ -1749,23 +1747,19 @@ func TestIntentionMatch_BlockOnNoChange(t *testing.T) {
t.Skip("too slow for testing.Short")
}
_, s1 := testServerWithConfig(t)
t.Parallel()
_, s1 := testServerWithConfig(t, func(c *Config) {
c.DevMode = true // keep it in ram to make it 10x faster on macos
})
codec := rpcClient(t, s1)
readerCodec := rpcClient(t, s1)
writerCodec := rpcClient(t, s1)
waitForLeaderEstablishment(t, s1)
run := func(t *testing.T, dataPrefix string, expectMatches int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var count int
start := time.Now()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
rpcBlockingQueryTestHarness(t,
func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) {
args := &structs.IntentionQueryRequest{
Datacenter: "dc1",
Match: &structs.IntentionQueryMatch{
@ -1775,35 +1769,23 @@ func TestIntentionMatch_BlockOnNoChange(t *testing.T) {
},
},
}
args.QueryOptions.MaxQueryTime = time.Second
args.QueryOptions.MinQueryIndex = minQueryIndex
for ctx.Err() == nil {
var out structs.IndexedIntentionMatches
err := msgpackrpc.CallWithCodec(readerCodec, "Intention.Match", args, &out)
if err != nil {
return fmt.Errorf("error getting intentions: %w", err)
}
errCh := channelCallRPC(s1, "Intention.Match", args, &out, func() error {
if len(out.Matches) != 1 {
return fmt.Errorf("expected 1 match got %d", len(out.Matches))
}
if len(out.Matches[0]) != expectMatches {
return fmt.Errorf("expected %d inner matches got %d", expectMatches, len(out.Matches[0]))
}
t.Log("blocking query index", out.QueryMeta.Index, out.Matches[0])
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})
g.Go(func() error {
for i := 0; i < 200; i++ {
time.Sleep(5 * time.Millisecond)
return &out.QueryMeta, errCh
},
func(i int) <-chan error {
var out string
err := msgpackrpc.CallWithCodec(writerCodec, "Intention.Apply", &structs.IntentionRequest{
return channelCallRPC(s1, "Intention.Apply", &structs.IntentionRequest{
Datacenter: "dc1",
Op: structs.IntentionOpCreate,
Intention: &structs.Intention{
@ -1814,18 +1796,9 @@ func TestIntentionMatch_BlockOnNoChange(t *testing.T) {
DestinationName: fmt.Sprintf(dataPrefix+"%d", i),
Action: structs.IntentionActionAllow,
},
}, &out)
if err != nil {
return fmt.Errorf("[%d] unexpected error: %w", i, err)
}
}
cancel()
return nil
})
require.NoError(t, g.Wait())
assertBlockingQueryWakeupCount(t, time.Second, start, count)
}, &out, nil)
},
)
}
runStep(t, "test the errNotFound path", func(t *testing.T) {

View File

@ -1,18 +1,15 @@
package consul
import (
"context"
"encoding/base64"
"fmt"
"os"
"strings"
"testing"
"time"
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
@ -2326,11 +2323,13 @@ func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) {
t.Skip("too slow for testing.Short")
}
_, s1 := testServerWithConfig(t)
t.Parallel()
_, s1 := testServerWithConfig(t, func(c *Config) {
c.DevMode = true // keep it in ram to make it 10x faster on macos
})
codec := rpcClient(t, s1)
readerCodec := rpcClient(t, s1)
writerCodec := rpcClient(t, s1)
waitForLeaderEstablishment(t, s1)
@ -2352,45 +2351,26 @@ func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) {
}
run := func(t *testing.T, dataPrefix string, expectServices int) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var count int
start := time.Now()
g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
rpcBlockingQueryTestHarness(t,
func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error) {
args := &structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: "web",
}
args.QueryOptions.MaxQueryTime = time.Second
args.QueryOptions.MinQueryIndex = minQueryIndex
for ctx.Err() == nil {
var out structs.IndexedServiceList
err := msgpackrpc.CallWithCodec(readerCodec, "Internal.IntentionUpstreams", args, &out)
if err != nil {
return fmt.Errorf("error getting upstreams: %w", err)
}
errCh := channelCallRPC(s1, "Internal.IntentionUpstreams", args, &out, func() error {
if len(out.Services) != expectServices {
return fmt.Errorf("expected %d services got %d", expectServices, len(out.Services))
}
t.Log("blocking query index", out.QueryMeta.Index, out.Services)
count++
args.QueryOptions.MinQueryIndex = out.QueryMeta.Index
}
return nil
})
g.Go(func() error {
for i := 0; i < 200; i++ {
time.Sleep(5 * time.Millisecond)
return &out.QueryMeta, errCh
},
func(i int) <-chan error {
var out string
err := msgpackrpc.CallWithCodec(writerCodec, "Intention.Apply", &structs.IntentionRequest{
return channelCallRPC(s1, "Intention.Apply", &structs.IntentionRequest{
Datacenter: "dc1",
Op: structs.IntentionOpCreate,
Intention: &structs.Intention{
@ -2398,18 +2378,9 @@ func TestInternal_IntentionUpstreams_BlockOnNoChange(t *testing.T) {
DestinationName: fmt.Sprintf(dataPrefix+"-dst-%d", i),
Action: structs.IntentionActionAllow,
},
}, &out)
if err != nil {
return fmt.Errorf("[%d] unexpected error: %w", i, err)
}
}
cancel()
return nil
})
require.NoError(t, g.Wait())
assertBlockingQueryWakeupCount(t, time.Second, start, count)
}, &out, nil)
},
)
}
runStep(t, "test the errNotFound path", func(t *testing.T) {

View File

@ -1682,23 +1682,103 @@ func getFirstSubscribeEventOrError(conn *grpc.ClientConn, req *pbsubscribe.Subsc
return event, nil
}
// assertBlockingQueryWakeupCount is used to assist in assertions for
// blockingQuery RPC tests involving the two sentinel errors errNotFound and
// errNotChanged.
//
// Those tests are a bit racy because of the timing of the two goroutines, so
// we relax the check for the count to be within a small range.
//
// The blocking query is going to wake up every interval, so use the elapsed test
// time with that known timing value to gauge how many legit wakeups should
// happen and then pad it out a smidge.
func assertBlockingQueryWakeupCount(t testing.TB, interval time.Duration, start time.Time, gotCount int) {
// channelCallRPC lets you execute an RPC async. Helpful in some
// tests.
func channelCallRPC(
srv *Server,
method string,
args interface{},
resp interface{},
responseInterceptor func() error,
) <-chan error {
errCh := make(chan error, 1)
go func() {
codec, err := rpcClientNoClose(srv)
if err != nil {
errCh <- err
return
}
defer codec.Close()
err = msgpackrpc.CallWithCodec(codec, method, args, resp)
if err == nil && responseInterceptor != nil {
err = responseInterceptor()
}
errCh <- err
}()
return errCh
}
// rpcBlockingQueryTestHarness is specifically meant to test the
// errNotFound and errNotChanged mechanisms in blockingQuery()
func rpcBlockingQueryTestHarness(
t *testing.T,
readQueryFn func(minQueryIndex uint64) (*structs.QueryMeta, <-chan error),
noisyWriteFn func(i int) <-chan error,
) {
t.Helper()
const buffer = 2
expectedQueries := int(time.Since(start)/interval) + buffer
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if gotCount < 2 || gotCount > expectedQueries {
t.Fatalf("expected count to be >= 2 or < %d, got %d", expectedQueries, gotCount)
launchWriters := func() {
defer cancel()
for i := 0; i < 200; i++ {
time.Sleep(5 * time.Millisecond)
errCh := noisyWriteFn(i)
select {
case <-ctx.Done():
return
case err := <-errCh:
if err != nil {
t.Errorf("[%d] unexpected error: %w", i, err)
return
}
}
}
}
var (
count int
minQueryIndex uint64
)
for ctx.Err() == nil {
// The first iteration is an orientation iteration, as we don't pass an
// index value so there is no actual blocking that will happen.
//
// Since the data is not changing, we don't expect the second iteration
// to return soon, so we wait a bit after kicking it off before
// launching the write-storm.
var timerCh <-chan time.Time
if count == 1 {
timerCh = time.After(50 * time.Millisecond)
}
qm, errCh := readQueryFn(minQueryIndex)
RESUME:
select {
case err := <-errCh:
if err != nil {
require.NoError(t, err)
}
t.Log("blocking query index", qm.Index)
count++
minQueryIndex = qm.Index
case <-timerCh:
timerCh = nil
go launchWriters()
goto RESUME
case <-ctx.Done():
break
}
}
require.Equal(t, 1, count, "if this fails, then the timer likely needs to be increased above")
}

View File

@ -17,17 +17,25 @@ import (
)
func rpcClient(t *testing.T, s *Server) rpc.ClientCodec {
codec, err := rpcClientNoClose(s)
if err != nil {
t.Fatalf("err: %v", err)
}
t.Cleanup(func() { codec.Close() })
return codec
}
func rpcClientNoClose(s *Server) (rpc.ClientCodec, error) {
addr := s.config.RPCAdvertise
conn, err := net.DialTimeout("tcp", addr.String(), time.Second)
if err != nil {
t.Fatalf("err: %v", err)
return nil, err
}
// Write the Consul RPC byte to set the mode
conn.Write([]byte{byte(pool.RPCConsul)})
codec := msgpackrpc.NewCodecFromHandle(true, true, conn, structs.MsgpackHandle)
t.Cleanup(func() { codec.Close() })
return codec
return codec, nil
}
func insecureRPCClient(s *Server, c tlsutil.Config) (rpc.ClientCodec, error) {