From c932833acb149e28f9e4df29cf127988a152e3c8 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 22 Apr 2021 14:08:35 -0400 Subject: [PATCH] rpcclient:health: fix a data race and flake in tests Split the TestStreamingClient into the two logical components the real client uses. This allows us to test multiple clients properly. Previously writing of ctx from multiple Subscribe calls was showing a data race. Once this was fixed a test started to fail because the request had to be made with a greater index, so that the store.Get call did not return immediately. --- agent/submatview/store_test.go | 2 +- agent/submatview/streaming_test.go | 66 ++++++++++++++++++++---------- 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/agent/submatview/store_test.go b/agent/submatview/store_test.go index 4a0699bae4..8a7bfafcdc 100644 --- a/agent/submatview/store_test.go +++ b/agent/submatview/store_test.go @@ -323,7 +323,7 @@ func TestStore_Notify_ManyRequests(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - req2 = &fakeRequest{client: req.client, key: "key2"} + req2 = &fakeRequest{client: req.client, key: "key2", index: 22} require.NoError(t, store.Notify(ctx, req2, cID, ch1)) go func() { diff --git a/agent/submatview/streaming_test.go b/agent/submatview/streaming_test.go index f846021792..80fec094fe 100644 --- a/agent/submatview/streaming_test.go +++ b/agent/submatview/streaming_test.go @@ -3,23 +3,23 @@ package submatview import ( "context" "fmt" + "sync" "google.golang.org/grpc" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbservice" - "github.com/hashicorp/consul/types" - "github.com/hashicorp/consul/proto/pbsubscribe" + "github.com/hashicorp/consul/types" ) // TestStreamingClient is a mock StreamingClient for testing that allows // for queueing up custom events to a subscriber. type TestStreamingClient struct { - pbsubscribe.StateChangeSubscription_SubscribeClient - events chan eventOrErr - ctx context.Context expectedNamespace string + subClients []*subscribeClient + lock sync.RWMutex + events []eventOrErr } type eventOrErr struct { @@ -28,44 +28,66 @@ type eventOrErr struct { } func NewTestStreamingClient(ns string) *TestStreamingClient { - return &TestStreamingClient{ - events: make(chan eventOrErr, 32), - expectedNamespace: ns, - } + return &TestStreamingClient{expectedNamespace: ns} } -func (t *TestStreamingClient) Subscribe( +func (s *TestStreamingClient) Subscribe( ctx context.Context, req *pbsubscribe.SubscribeRequest, _ ...grpc.CallOption, ) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) { - if req.Namespace != t.expectedNamespace { + if req.Namespace != s.expectedNamespace { return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v", - req.Namespace, t.expectedNamespace) + req.Namespace, s.expectedNamespace) } - t.ctx = ctx - return t, nil + c := &subscribeClient{ + events: make(chan eventOrErr, 32), + ctx: ctx, + } + s.lock.Lock() + s.subClients = append(s.subClients, c) + for _, event := range s.events { + c.events <- event + } + s.lock.Unlock() + return c, nil } -func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) { +type subscribeClient struct { + grpc.ClientStream + events chan eventOrErr + ctx context.Context +} + +func (s *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) { + s.lock.Lock() for _, e := range events { - t.events <- eventOrErr{Event: e} + s.events = append(s.events, eventOrErr{Event: e}) + for _, c := range s.subClients { + c.events <- eventOrErr{Event: e} + } } + s.lock.Unlock() } -func (t *TestStreamingClient) QueueErr(err error) { - t.events <- eventOrErr{Err: err} +func (s *TestStreamingClient) QueueErr(err error) { + s.lock.Lock() + s.events = append(s.events, eventOrErr{Err: err}) + for _, c := range s.subClients { + c.events <- eventOrErr{Err: err} + } + s.lock.Unlock() } -func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) { +func (c *subscribeClient) Recv() (*pbsubscribe.Event, error) { select { - case eoe := <-t.events: + case eoe := <-c.events: if eoe.Err != nil { return nil, eoe.Err } return eoe.Event, nil - case <-t.ctx.Done(): - return nil, t.ctx.Err() + case <-c.ctx.Done(): + return nil, c.ctx.Err() } }