mirror of https://github.com/status-im/consul.git
agent/cache: implement refresh backoff
This commit is contained in:
parent
bc605a1576
commit
cfcd733609
|
@ -2785,7 +2785,7 @@ func (a *Agent) registerCache() {
|
|||
}, &cache.RegisterOptions{
|
||||
// Maintain a blocking query, retry dropped connections quickly
|
||||
Refresh: true,
|
||||
RefreshTimer: 3 * time.Second,
|
||||
RefreshTimer: 0 * time.Second,
|
||||
RefreshTimeout: 10 * time.Minute,
|
||||
})
|
||||
|
||||
|
@ -2795,7 +2795,7 @@ func (a *Agent) registerCache() {
|
|||
}, &cache.RegisterOptions{
|
||||
// Maintain a blocking query, retry dropped connections quickly
|
||||
Refresh: true,
|
||||
RefreshTimer: 3 * time.Second,
|
||||
RefreshTimer: 0 * time.Second,
|
||||
RefreshTimeout: 10 * time.Minute,
|
||||
})
|
||||
|
||||
|
@ -2804,7 +2804,7 @@ func (a *Agent) registerCache() {
|
|||
}, &cache.RegisterOptions{
|
||||
// Maintain a blocking query, retry dropped connections quickly
|
||||
Refresh: true,
|
||||
RefreshTimer: 3 * time.Second,
|
||||
RefreshTimer: 0 * time.Second,
|
||||
RefreshTimeout: 10 * time.Minute,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -26,6 +26,13 @@ import (
|
|||
|
||||
//go:generate mockery -all -inpkg
|
||||
|
||||
// Constants related to refresh backoff. We probably don't ever need to
|
||||
// make these configurable knobs since they primarily exist to lower load.
|
||||
const (
|
||||
CacheRefreshBackoffMin = 3 // 3 attempts before backing off
|
||||
CacheRefreshMaxWait = 1 * time.Minute // maximum backoff wait time
|
||||
)
|
||||
|
||||
// Cache is a agent-local cache of Consul data. Create a Cache using the
|
||||
// New function. A zero-value Cache is not ready for usage and will result
|
||||
// in a panic.
|
||||
|
@ -330,14 +337,6 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool) (<-chan struct{},
|
|||
Timeout: tEntry.Opts.RefreshTimeout,
|
||||
}, r)
|
||||
|
||||
if err == nil {
|
||||
metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1)
|
||||
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1)
|
||||
} else {
|
||||
metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1)
|
||||
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1)
|
||||
}
|
||||
|
||||
// Copy the existing entry to start.
|
||||
newEntry := entry
|
||||
newEntry.Fetching = false
|
||||
|
@ -351,11 +350,27 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool) (<-chan struct{},
|
|||
newEntry.Valid = true
|
||||
}
|
||||
|
||||
// If we have an error and the prior entry wasn't valid, then we
|
||||
// set the error at least.
|
||||
if err != nil && !newEntry.Valid {
|
||||
// Error handling
|
||||
if err == nil {
|
||||
metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1)
|
||||
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1)
|
||||
|
||||
// Reset the attepts counter so we don't have any backoff
|
||||
newEntry.ErrAttempts = 0
|
||||
} else {
|
||||
metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1)
|
||||
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1)
|
||||
|
||||
// Always increment the attempts to control backoff
|
||||
newEntry.ErrAttempts++
|
||||
|
||||
// If the entry wasn't valid, we set an error. If it was valid,
|
||||
// we don't set an error so that the prior value can continue
|
||||
// being used. This will be evicted if the TTL comes up.
|
||||
if !newEntry.Valid {
|
||||
newEntry.Error = err
|
||||
}
|
||||
}
|
||||
|
||||
// Create a new waiter that will be used for the next fetch.
|
||||
newEntry.Waiter = make(chan struct{})
|
||||
|
@ -384,7 +399,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool) (<-chan struct{},
|
|||
// If refresh is enabled, run the refresh in due time. The refresh
|
||||
// below might block, but saves us from spawning another goroutine.
|
||||
if tEntry.Opts.Refresh {
|
||||
c.refresh(tEntry.Opts, t, key, r)
|
||||
c.refresh(tEntry.Opts, newEntry.ErrAttempts, t, key, r)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -417,12 +432,22 @@ func (c *Cache) fetchDirect(t string, r Request) (interface{}, error) {
|
|||
|
||||
// refresh triggers a fetch for a specific Request according to the
|
||||
// registration options.
|
||||
func (c *Cache) refresh(opts *RegisterOptions, t string, key string, r Request) {
|
||||
func (c *Cache) refresh(opts *RegisterOptions, attempt uint8, t string, key string, r Request) {
|
||||
// Sanity-check, we should not schedule anything that has refresh disabled
|
||||
if !opts.Refresh {
|
||||
return
|
||||
}
|
||||
|
||||
// If we're over the attempt minimum, start an exponential backoff.
|
||||
if attempt > CacheRefreshBackoffMin {
|
||||
waitTime := (1 << (attempt - CacheRefreshBackoffMin)) * time.Second
|
||||
if waitTime > CacheRefreshMaxWait {
|
||||
waitTime = CacheRefreshMaxWait
|
||||
}
|
||||
|
||||
time.Sleep(waitTime)
|
||||
}
|
||||
|
||||
// If we have a timer, wait for it
|
||||
if opts.RefreshTimer > 0 {
|
||||
time.Sleep(opts.RefreshTimer)
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -336,6 +337,47 @@ func TestCacheGet_periodicRefresh(t *testing.T) {
|
|||
TestCacheGetChResult(t, resultCh, 12)
|
||||
}
|
||||
|
||||
// Test that a refresh performs a backoff.
|
||||
func TestCacheGet_periodicRefreshErrorBackoff(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
typ := TestType(t)
|
||||
defer typ.AssertExpectations(t)
|
||||
c := TestCache(t)
|
||||
c.RegisterType("t", typ, &RegisterOptions{
|
||||
Refresh: true,
|
||||
RefreshTimer: 0,
|
||||
RefreshTimeout: 5 * time.Minute,
|
||||
})
|
||||
|
||||
// Configure the type
|
||||
var retries uint32
|
||||
fetchErr := fmt.Errorf("test fetch error")
|
||||
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once()
|
||||
typ.Static(FetchResult{Value: nil, Index: 5}, fetchErr).Run(func(args mock.Arguments) {
|
||||
atomic.AddUint32(&retries, 1)
|
||||
})
|
||||
|
||||
// Fetch
|
||||
resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||
TestCacheGetChResult(t, resultCh, 1)
|
||||
|
||||
// Sleep a bit. The refresh will quietly fail in the background. What we
|
||||
// want to verify is that it doesn't retry too much. "Too much" is hard
|
||||
// to measure since its CPU dependent if this test is failing. But due
|
||||
// to the short sleep below, we can calculate about what we'd expect if
|
||||
// backoff IS working.
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
|
||||
// Fetch should work, we should get a 1 still. Errors are ignored.
|
||||
resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||
TestCacheGetChResult(t, resultCh, 1)
|
||||
|
||||
// Check the number
|
||||
actual := atomic.LoadUint32(&retries)
|
||||
require.True(t, actual < 10, fmt.Sprintf("actual: %d", actual))
|
||||
}
|
||||
|
||||
// Test that the backend fetch sets the proper timeout.
|
||||
func TestCacheGet_fetchTimeout(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
|
|
@ -19,6 +19,7 @@ type cacheEntry struct {
|
|||
Valid bool // True if the Value is set
|
||||
Fetching bool // True if a fetch is already active
|
||||
Waiter chan struct{} // Closed when this entry is invalidated
|
||||
ErrAttempts uint8 // Number of fetch errors since last success (Error may be nil)
|
||||
|
||||
// Expiry contains information about the expiration of this
|
||||
// entry. This is a pointer as its shared as a value in the
|
||||
|
|
Loading…
Reference in New Issue