diff --git a/agent/agent.go b/agent/agent.go index 21ada77cd1..77045c69e9 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -2785,7 +2785,7 @@ func (a *Agent) registerCache() { }, &cache.RegisterOptions{ // Maintain a blocking query, retry dropped connections quickly Refresh: true, - RefreshTimer: 3 * time.Second, + RefreshTimer: 0 * time.Second, RefreshTimeout: 10 * time.Minute, }) @@ -2795,7 +2795,7 @@ func (a *Agent) registerCache() { }, &cache.RegisterOptions{ // Maintain a blocking query, retry dropped connections quickly Refresh: true, - RefreshTimer: 3 * time.Second, + RefreshTimer: 0 * time.Second, RefreshTimeout: 10 * time.Minute, }) @@ -2804,7 +2804,7 @@ func (a *Agent) registerCache() { }, &cache.RegisterOptions{ // Maintain a blocking query, retry dropped connections quickly Refresh: true, - RefreshTimer: 3 * time.Second, + RefreshTimer: 0 * time.Second, RefreshTimeout: 10 * time.Minute, }) } diff --git a/agent/cache/cache.go b/agent/cache/cache.go index cdcaffc58c..754d635df7 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -26,6 +26,13 @@ import ( //go:generate mockery -all -inpkg +// Constants related to refresh backoff. We probably don't ever need to +// make these configurable knobs since they primarily exist to lower load. +const ( + CacheRefreshBackoffMin = 3 // 3 attempts before backing off + CacheRefreshMaxWait = 1 * time.Minute // maximum backoff wait time +) + // Cache is a agent-local cache of Consul data. Create a Cache using the // New function. A zero-value Cache is not ready for usage and will result // in a panic. @@ -330,14 +337,6 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool) (<-chan struct{}, Timeout: tEntry.Opts.RefreshTimeout, }, r) - if err == nil { - metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1) - metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1) - } else { - metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1) - metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1) - } - // Copy the existing entry to start. newEntry := entry newEntry.Fetching = false @@ -351,10 +350,26 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool) (<-chan struct{}, newEntry.Valid = true } - // If we have an error and the prior entry wasn't valid, then we - // set the error at least. - if err != nil && !newEntry.Valid { - newEntry.Error = err + // Error handling + if err == nil { + metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1) + metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1) + + // Reset the attepts counter so we don't have any backoff + newEntry.ErrAttempts = 0 + } else { + metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1) + metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1) + + // Always increment the attempts to control backoff + newEntry.ErrAttempts++ + + // If the entry wasn't valid, we set an error. If it was valid, + // we don't set an error so that the prior value can continue + // being used. This will be evicted if the TTL comes up. + if !newEntry.Valid { + newEntry.Error = err + } } // Create a new waiter that will be used for the next fetch. @@ -384,7 +399,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool) (<-chan struct{}, // If refresh is enabled, run the refresh in due time. The refresh // below might block, but saves us from spawning another goroutine. if tEntry.Opts.Refresh { - c.refresh(tEntry.Opts, t, key, r) + c.refresh(tEntry.Opts, newEntry.ErrAttempts, t, key, r) } }() @@ -417,12 +432,22 @@ func (c *Cache) fetchDirect(t string, r Request) (interface{}, error) { // refresh triggers a fetch for a specific Request according to the // registration options. -func (c *Cache) refresh(opts *RegisterOptions, t string, key string, r Request) { +func (c *Cache) refresh(opts *RegisterOptions, attempt uint8, t string, key string, r Request) { // Sanity-check, we should not schedule anything that has refresh disabled if !opts.Refresh { return } + // If we're over the attempt minimum, start an exponential backoff. + if attempt > CacheRefreshBackoffMin { + waitTime := (1 << (attempt - CacheRefreshBackoffMin)) * time.Second + if waitTime > CacheRefreshMaxWait { + waitTime = CacheRefreshMaxWait + } + + time.Sleep(waitTime) + } + // If we have a timer, wait for it if opts.RefreshTimer > 0 { time.Sleep(opts.RefreshTimer) diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index cf179b2aba..07490be13f 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -4,6 +4,7 @@ import ( "fmt" "sort" "sync" + "sync/atomic" "testing" "time" @@ -336,6 +337,47 @@ func TestCacheGet_periodicRefresh(t *testing.T) { TestCacheGetChResult(t, resultCh, 12) } +// Test that a refresh performs a backoff. +func TestCacheGet_periodicRefreshErrorBackoff(t *testing.T) { + t.Parallel() + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, &RegisterOptions{ + Refresh: true, + RefreshTimer: 0, + RefreshTimeout: 5 * time.Minute, + }) + + // Configure the type + var retries uint32 + fetchErr := fmt.Errorf("test fetch error") + typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once() + typ.Static(FetchResult{Value: nil, Index: 5}, fetchErr).Run(func(args mock.Arguments) { + atomic.AddUint32(&retries, 1) + }) + + // Fetch + resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) + TestCacheGetChResult(t, resultCh, 1) + + // Sleep a bit. The refresh will quietly fail in the background. What we + // want to verify is that it doesn't retry too much. "Too much" is hard + // to measure since its CPU dependent if this test is failing. But due + // to the short sleep below, we can calculate about what we'd expect if + // backoff IS working. + time.Sleep(500 * time.Millisecond) + + // Fetch should work, we should get a 1 still. Errors are ignored. + resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) + TestCacheGetChResult(t, resultCh, 1) + + // Check the number + actual := atomic.LoadUint32(&retries) + require.True(t, actual < 10, fmt.Sprintf("actual: %d", actual)) +} + // Test that the backend fetch sets the proper timeout. func TestCacheGet_fetchTimeout(t *testing.T) { t.Parallel() diff --git a/agent/cache/entry.go b/agent/cache/entry.go index 50c575ff73..6bab621a30 100644 --- a/agent/cache/entry.go +++ b/agent/cache/entry.go @@ -16,9 +16,10 @@ type cacheEntry struct { Index uint64 // Metadata that is used for internal accounting - Valid bool // True if the Value is set - Fetching bool // True if a fetch is already active - Waiter chan struct{} // Closed when this entry is invalidated + Valid bool // True if the Value is set + Fetching bool // True if a fetch is already active + Waiter chan struct{} // Closed when this entry is invalidated + ErrAttempts uint8 // Number of fetch errors since last success (Error may be nil) // Expiry contains information about the expiration of this // entry. This is a pointer as its shared as a value in the