diff --git a/agent/submatview/store_test.go b/agent/submatview/store_test.go index 4a0699bae4..8a7bfafcdc 100644 --- a/agent/submatview/store_test.go +++ b/agent/submatview/store_test.go @@ -323,7 +323,7 @@ func TestStore_Notify_ManyRequests(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - req2 = &fakeRequest{client: req.client, key: "key2"} + req2 = &fakeRequest{client: req.client, key: "key2", index: 22} require.NoError(t, store.Notify(ctx, req2, cID, ch1)) go func() { diff --git a/agent/submatview/streaming_test.go b/agent/submatview/streaming_test.go index f846021792..80fec094fe 100644 --- a/agent/submatview/streaming_test.go +++ b/agent/submatview/streaming_test.go @@ -3,23 +3,23 @@ package submatview import ( "context" "fmt" + "sync" "google.golang.org/grpc" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbservice" - "github.com/hashicorp/consul/types" - "github.com/hashicorp/consul/proto/pbsubscribe" + "github.com/hashicorp/consul/types" ) // TestStreamingClient is a mock StreamingClient for testing that allows // for queueing up custom events to a subscriber. type TestStreamingClient struct { - pbsubscribe.StateChangeSubscription_SubscribeClient - events chan eventOrErr - ctx context.Context expectedNamespace string + subClients []*subscribeClient + lock sync.RWMutex + events []eventOrErr } type eventOrErr struct { @@ -28,44 +28,66 @@ type eventOrErr struct { } func NewTestStreamingClient(ns string) *TestStreamingClient { - return &TestStreamingClient{ - events: make(chan eventOrErr, 32), - expectedNamespace: ns, - } + return &TestStreamingClient{expectedNamespace: ns} } -func (t *TestStreamingClient) Subscribe( +func (s *TestStreamingClient) Subscribe( ctx context.Context, req *pbsubscribe.SubscribeRequest, _ ...grpc.CallOption, ) (pbsubscribe.StateChangeSubscription_SubscribeClient, error) { - if req.Namespace != t.expectedNamespace { + if req.Namespace != s.expectedNamespace { return nil, fmt.Errorf("wrong SubscribeRequest.Namespace %v, expected %v", - req.Namespace, t.expectedNamespace) + req.Namespace, s.expectedNamespace) } - t.ctx = ctx - return t, nil + c := &subscribeClient{ + events: make(chan eventOrErr, 32), + ctx: ctx, + } + s.lock.Lock() + s.subClients = append(s.subClients, c) + for _, event := range s.events { + c.events <- event + } + s.lock.Unlock() + return c, nil } -func (t *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) { +type subscribeClient struct { + grpc.ClientStream + events chan eventOrErr + ctx context.Context +} + +func (s *TestStreamingClient) QueueEvents(events ...*pbsubscribe.Event) { + s.lock.Lock() for _, e := range events { - t.events <- eventOrErr{Event: e} + s.events = append(s.events, eventOrErr{Event: e}) + for _, c := range s.subClients { + c.events <- eventOrErr{Event: e} + } } + s.lock.Unlock() } -func (t *TestStreamingClient) QueueErr(err error) { - t.events <- eventOrErr{Err: err} +func (s *TestStreamingClient) QueueErr(err error) { + s.lock.Lock() + s.events = append(s.events, eventOrErr{Err: err}) + for _, c := range s.subClients { + c.events <- eventOrErr{Err: err} + } + s.lock.Unlock() } -func (t *TestStreamingClient) Recv() (*pbsubscribe.Event, error) { +func (c *subscribeClient) Recv() (*pbsubscribe.Event, error) { select { - case eoe := <-t.events: + case eoe := <-c.events: if eoe.Err != nil { return nil, eoe.Err } return eoe.Event, nil - case <-t.ctx.Done(): - return nil, t.ctx.Err() + case <-c.ctx.Done(): + return nil, c.ctx.Err() } }