From 311d503fb0489f8e78239daffb4e69b2700c54c0 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sat, 2 Jun 2018 20:16:44 -0700 Subject: [PATCH] agent/cache: perform backoffs on error retries on blocking queries --- agent/cache/cache.go | 34 ++++++++++++++++---------------- agent/cache/cache_test.go | 41 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+), 17 deletions(-) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index e2eee03d85..c03224eeda 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -192,7 +192,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, error) { key := c.entryKey(&info) // First time through - first := true + var attempt uint // timeoutCh for watching our timeout var timeoutCh <-chan time.Time @@ -209,7 +209,7 @@ RETRY_GET: // we have. if ok && entry.Valid { if info.MinIndex == 0 || info.MinIndex < entry.Index { - if first { + if attempt == 0 { metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1) atomic.AddUint64(&c.hits, 1) } @@ -229,11 +229,11 @@ RETRY_GET: // a retry loop getting the same error for the entire duration of the // timeout. Instead, we make one effort to fetch a new value, and if // there was an error, we return. - if !first && entry.Error != nil { + if attempt > 0 && entry.Error != nil { return entry.Value, entry.Error } - if first { + if attempt == 0 { // Record the miss if its our first time through atomic.AddUint64(&c.misses, 1) @@ -248,7 +248,7 @@ RETRY_GET: } // No longer our first time through - first = false + attempt++ // Set our timeout channel if we must if info.Timeout > 0 && timeoutCh == nil { @@ -257,7 +257,7 @@ RETRY_GET: // At this point, we know we either don't have a value at all or the // value we have is too old. We need to wait for new data. - waiterCh, err := c.fetch(t, key, r, true, 0) + waiterCh, err := c.fetch(t, key, r, true, attempt) if err != nil { return nil, err } @@ -296,6 +296,16 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- return nil, fmt.Errorf("unknown type in cache: %s", t) } + // If we're over the attempt minimum, start an exponential backoff. + if attempt > CacheRefreshBackoffMin { + waitTime := (1 << (attempt - CacheRefreshBackoffMin)) * time.Second + if waitTime > CacheRefreshMaxWait { + waitTime = CacheRefreshMaxWait + } + + time.Sleep(waitTime) + } + // We acquire a write lock because we may have to set Fetching to true. c.entriesLock.Lock() defer c.entriesLock.Unlock() @@ -398,7 +408,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- // If refresh is enabled, run the refresh in due time. The refresh // below might block, but saves us from spawning another goroutine. - if tEntry.Opts.Refresh { + if !ok && tEntry.Opts.Refresh { c.refresh(tEntry.Opts, attempt, t, key, r) } }() @@ -438,16 +448,6 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin return } - // If we're over the attempt minimum, start an exponential backoff. - if attempt > CacheRefreshBackoffMin { - waitTime := (1 << (attempt - CacheRefreshBackoffMin)) * time.Second - if waitTime > CacheRefreshMaxWait { - waitTime = CacheRefreshMaxWait - } - - time.Sleep(waitTime) - } - // If we have a timer, wait for it if opts.RefreshTimer > 0 { time.Sleep(opts.RefreshTimer) diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 07490be13f..1f643ed9f3 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -263,6 +263,47 @@ func TestCacheGet_blockingIndexTimeout(t *testing.T) { } } +// Test a get with an index set with requests returning an error +// will perform a backoff on retrying the fetch. +func TestCacheGet_blockingIndexBackoff(t *testing.T) { + t.Parallel() + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, nil) + + // Configure the type + var retries uint32 + fetchErr := fmt.Errorf("test fetch error") + typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once() + typ.Static(FetchResult{Value: nil, Index: 5}, fetchErr).Run(func(args mock.Arguments) { + atomic.AddUint32(&retries, 1) + }) + + // First good fetch to populate catch + resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) + TestCacheGetChResult(t, resultCh, 1) + + // Fetch should block + resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{ + Key: "hello", MinIndex: 7, Timeout: 1 * time.Minute})) + + // Should block + select { + case <-resultCh: + t.Fatal("should block") + case <-time.After(50 * time.Millisecond): + } + + // Wait a bit + time.Sleep(100 * time.Millisecond) + + // Check the number + actual := atomic.LoadUint32(&retries) + require.True(t, actual < 10, fmt.Sprintf("actual: %d", actual)) +} + // Test that if a Type returns an empty value on Fetch that the previous // value is preserved. func TestCacheGet_emptyFetchResult(t *testing.T) {