diff --git a/agent/cache/cache.go b/agent/cache/cache.go index a57ab83435..a1b4570b07 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -136,6 +136,9 @@ func (c *Cache) Get(t string, r Request) (interface{}, error) { // First time through first := true + // timeoutCh for watching our tmeout + var timeoutCh <-chan time.Time + RETRY_GET: // Get the current value c.entriesLock.RLock() @@ -164,16 +167,27 @@ RETRY_GET: // No longer our first time through first = false + // Set our timeout channel if we must + if info.Timeout > 0 && timeoutCh == nil { + timeoutCh = time.After(info.Timeout) + } + // At this point, we know we either don't have a value at all or the // value we have is too old. We need to wait for new data. - waiter, err := c.fetch(t, key, r) + waiterCh, err := c.fetch(t, key, r) if err != nil { return nil, err } - // Wait on our waiter and then retry the cache load - <-waiter - goto RETRY_GET + select { + case <-waiterCh: + // Our fetch returned, retry the get from the cache + goto RETRY_GET + + case <-timeoutCh: + // Timeout on the cache read, just return whatever we have. + return entry.Value, nil + } } // entryKey returns the key for the entry in the cache. See the note @@ -216,16 +230,26 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) { // The actual Fetch must be performed in a goroutine. go func() { // Start building the new entry by blocking on the fetch. - var newEntry cacheEntry result, err := tEntry.Type.Fetch(FetchOptions{ MinIndex: entry.Index, }, r) - newEntry.Value = result.Value - newEntry.Index = result.Index - newEntry.Error = err - // This is a valid entry with a result - newEntry.Valid = true + var newEntry cacheEntry + if result.Value == nil { + // If no value was set, then we do not change the prior entry. + // Instead, we just update the waiter to be new so that another + // Get will wait on the correct value. + newEntry = entry + newEntry.Fetching = false + } else { + // A new value was given, so we create a brand new entry. + newEntry.Value = result.Value + newEntry.Index = result.Index + newEntry.Error = err + + // This is a valid entry with a result + newEntry.Valid = true + } // Create a new waiter that will be used for the next fetch. newEntry.Waiter = make(chan struct{}) diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 1e75490a03..b8ca66dc4e 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -194,6 +194,77 @@ func TestCacheGet_blockingIndex(t *testing.T) { TestCacheGetChResult(t, resultCh, 42) } +// Test a get with an index set will timeout if the fetch doesn't return +// anything. +func TestCacheGet_blockingIndexTimeout(t *testing.T) { + t.Parallel() + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, nil) + + // Configure the type + triggerCh := make(chan time.Time) + typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once() + typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once() + typ.Static(FetchResult{Value: 42, Index: 6}, nil).WaitUntil(triggerCh) + + // Fetch should block + resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{ + Key: "hello", MinIndex: 5, Timeout: 200 * time.Millisecond})) + + // Should block + select { + case <-resultCh: + t.Fatal("should block") + case <-time.After(50 * time.Millisecond): + } + + // Should return after more of the timeout + select { + case result := <-resultCh: + require.Equal(t, 12, result) + case <-time.After(300 * time.Millisecond): + t.Fatal("should've returned") + } +} + +// Test that if a Type returns an empty value on Fetch that the previous +// value is preserved. +func TestCacheGet_emptyFetchResult(t *testing.T) { + t.Parallel() + + require := require.New(t) + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, nil) + + // Configure the type + typ.Static(FetchResult{Value: 42, Index: 1}, nil).Times(1) + typ.Static(FetchResult{Value: nil}, nil) + + // Get, should fetch + req := TestRequest(t, RequestInfo{Key: "hello"}) + result, err := c.Get("t", req) + require.Nil(err) + require.Equal(42, result) + + // Get, should not fetch since we already have a satisfying value + req = TestRequest(t, RequestInfo{ + Key: "hello", MinIndex: 1, Timeout: 100 * time.Millisecond}) + result, err = c.Get("t", req) + require.Nil(err) + require.Equal(42, result) + + // Sleep a tiny bit just to let maybe some background calls happen + // then verify that we still only got the one call + time.Sleep(20 * time.Millisecond) + typ.AssertExpectations(t) +} + // Test that a type registered with a periodic refresh will perform // that refresh after the timer is up. func TestCacheGet_periodicRefresh(t *testing.T) { diff --git a/agent/cache/request.go b/agent/cache/request.go index b4a1b75d0c..7beec58e86 100644 --- a/agent/cache/request.go +++ b/agent/cache/request.go @@ -1,5 +1,9 @@ package cache +import ( + "time" +) + // Request is a cache-able request. // // This interface is typically implemented by request structures in @@ -36,4 +40,9 @@ type RequestInfo struct { // to block until new data is available. If no index is available, the // default value (zero) is acceptable. MinIndex uint64 + + // Timeout is the timeout for waiting on a blocking query. When the + // timeout is reached, the last known value is returned (or maybe nil + // if there was no prior value). + Timeout time.Duration } diff --git a/agent/cache/type.go b/agent/cache/type.go index 6e8edeb5fa..cccb10b94f 100644 --- a/agent/cache/type.go +++ b/agent/cache/type.go @@ -15,6 +15,12 @@ type Type interface { // // The return value is a FetchResult which contains information about // the fetch. + // + // On timeout, FetchResult can behave one of two ways. First, it can + // return the last known value. This is the default behavior of blocking + // RPC calls in Consul so this allows cache types to be implemented with + // no extra logic. Second, FetchResult can return an unset value and index. + // In this case, the cache will reuse the last value automatically. Fetch(FetchOptions, Request) (FetchResult, error) }