diff --git a/agent/submatview/materializer.go b/agent/submatview/materializer.go index 6cea97c002..94f922c61c 100644 --- a/agent/submatview/materializer.go +++ b/agent/submatview/materializer.go @@ -215,17 +215,17 @@ func (m *Materializer) notifyUpdateLocked(err error) { m.updateCh = make(chan struct{}) } -type viewResult struct { +type Result struct { Index uint64 Value interface{} } // getFromView blocks until the index of the View is greater than opts.MinIndex, //or the context is cancelled. -func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (viewResult, error) { +func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result, error) { m.lock.Lock() - result := viewResult{ + result := Result{ Index: m.index, Value: m.view.Result(m.index), } diff --git a/agent/submatview/store.go b/agent/submatview/store.go index 8b6fbaa586..4485e0f368 100644 --- a/agent/submatview/store.go +++ b/agent/submatview/store.go @@ -25,7 +25,6 @@ type entry struct { requests int } -// TODO: start expiration loop func NewStore() *Store { return &Store{ byKey: make(map[string]entry), @@ -77,11 +76,7 @@ type Request interface { // Get a value from the store, blocking if the store has not yet seen the // req.Index value. // See agent/cache.Cache.Get for complete documentation. -func (s *Store) Get( - ctx context.Context, - req Request, - // TODO: only the Index field of ResultMeta is relevant, return a result struct instead. -) (interface{}, cache.ResultMeta, error) { +func (s *Store) Get(ctx context.Context, req Request) (Result, error) { info := req.CacheInfo() key, e := s.getEntry(req) defer s.releaseEntry(key) @@ -94,7 +89,7 @@ func (s *Store) Get( // TODO: does context.DeadlineExceeded need to be translated into a nil error // to match the old interface? - return result.Value, cache.ResultMeta{Index: result.Index}, err + return result, err } // Notify the updateCh when there are updates to the entry identified by req. diff --git a/agent/submatview/store_test.go b/agent/submatview/store_test.go index 1507e736cf..75c411fd8d 100644 --- a/agent/submatview/store_test.go +++ b/agent/submatview/store_test.go @@ -15,7 +15,7 @@ import ( "github.com/hashicorp/consul/proto/pbsubscribe" ) -func TestStore_Get_Fresh(t *testing.T) { +func TestStore_Get(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -30,25 +30,102 @@ func TestStore_Get_Fresh(t *testing.T) { newEventServiceHealthRegister(10, 1, "srv1"), newEventServiceHealthRegister(22, 2, "srv1")) - result, md, err := store.Get(ctx, req) - require.NoError(t, err) - require.Equal(t, uint64(22), md.Index) + runStep(t, "from empty store, starts materializer", func(t *testing.T) { + result, err := store.Get(ctx, req) + require.NoError(t, err) + require.Equal(t, uint64(22), result.Index) - r, ok := result.(fakeResult) - require.True(t, ok) - require.Len(t, r.srvs, 2) - require.Equal(t, uint64(22), r.index) + r, ok := result.Value.(fakeResult) + require.True(t, ok) + require.Len(t, r.srvs, 2) + require.Equal(t, uint64(22), r.index) - store.lock.Lock() - require.Len(t, store.byKey, 1) - e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] - require.Equal(t, 0, e.expiry.Index()) + store.lock.Lock() + defer store.lock.Unlock() + require.Len(t, store.byKey, 1) + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + require.Equal(t, 0, e.expiry.Index()) + require.Equal(t, 0, e.requests) - defer store.lock.Unlock() - require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) + require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) + }) + + runStep(t, "with an index that already exists in the view", func(t *testing.T) { + req.index = 21 + result, err := store.Get(ctx, req) + require.NoError(t, err) + require.Equal(t, uint64(22), result.Index) + + r, ok := result.Value.(fakeResult) + require.True(t, ok) + require.Len(t, r.srvs, 2) + require.Equal(t, uint64(22), r.index) + + store.lock.Lock() + defer store.lock.Unlock() + require.Len(t, store.byKey, 1) + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + require.Equal(t, 0, e.expiry.Index()) + require.Equal(t, 0, e.requests) + + require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) + }) + + runStep(t, "blocks with an index that is not yet in the view", func(t *testing.T) { + req.index = 23 + + chResult := make(chan resultOrError, 1) + go func() { + result, err := store.Get(ctx, req) + chResult <- resultOrError{Result: result, Err: err} + }() + + select { + case <-chResult: + t.Fatalf("expected Get to block") + case <-time.After(50 * time.Millisecond): + } + + store.lock.Lock() + e := store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + store.lock.Unlock() + require.Equal(t, 1, e.requests) + + req.client.QueueEvents(newEventServiceHealthRegister(24, 1, "srv1")) + + var getResult resultOrError + select { + case getResult = <-chResult: + case <-time.After(100 * time.Millisecond): + t.Fatalf("expected Get to unblock when new events are received") + } + + require.NoError(t, getResult.Err) + require.Equal(t, uint64(24), getResult.Result.Index) + + r, ok := getResult.Result.Value.(fakeResult) + require.True(t, ok) + require.Len(t, r.srvs, 2) + require.Equal(t, uint64(24), r.index) + + store.lock.Lock() + defer store.lock.Unlock() + require.Len(t, store.byKey, 1) + e = store.byKey[makeEntryKey(req.Type(), req.CacheInfo())] + require.Equal(t, 0, e.expiry.Index()) + require.Equal(t, 0, e.requests) + + require.Equal(t, store.expiryHeap.Next().Entry, e.expiry) + }) +} + +type resultOrError struct { + Result Result + Err error } type fakeRequest struct { + index uint64 client *TestStreamingClient } @@ -58,6 +135,7 @@ func (r *fakeRequest) CacheInfo() cache.RequestInfo { Token: "abcd", Datacenter: "dc1", Timeout: 4 * time.Second, + MinIndex: r.index, } } @@ -125,8 +203,7 @@ func (f *fakeView) Reset() { f.srvs = make(map[string]*pbservice.CheckServiceNode) } -// TODO: Get with an entry that already has index -// TODO: Get with an entry that is not yet at index +// TODO: Get with Notify func TestStore_Notify(t *testing.T) { // TODO: Notify with no existing entry @@ -134,3 +211,10 @@ func TestStore_Notify(t *testing.T) { // TODO: Notify multiple times same key // TODO: Notify no update if index is not past MinIndex. } + +func runStep(t *testing.T, name string, fn func(t *testing.T)) { + t.Helper() + if !t.Run(name, fn) { + t.FailNow() + } +}