mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 22:06:20 +00:00
agent/cache: perform backoffs on error retries on blocking queries
This commit is contained in:
parent
c95ea2f205
commit
311d503fb0
34
agent/cache/cache.go
vendored
34
agent/cache/cache.go
vendored
@ -192,7 +192,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, error) {
|
||||
key := c.entryKey(&info)
|
||||
|
||||
// First time through
|
||||
first := true
|
||||
var attempt uint
|
||||
|
||||
// timeoutCh for watching our timeout
|
||||
var timeoutCh <-chan time.Time
|
||||
@ -209,7 +209,7 @@ RETRY_GET:
|
||||
// we have.
|
||||
if ok && entry.Valid {
|
||||
if info.MinIndex == 0 || info.MinIndex < entry.Index {
|
||||
if first {
|
||||
if attempt == 0 {
|
||||
metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1)
|
||||
atomic.AddUint64(&c.hits, 1)
|
||||
}
|
||||
@ -229,11 +229,11 @@ RETRY_GET:
|
||||
// a retry loop getting the same error for the entire duration of the
|
||||
// timeout. Instead, we make one effort to fetch a new value, and if
|
||||
// there was an error, we return.
|
||||
if !first && entry.Error != nil {
|
||||
if attempt > 0 && entry.Error != nil {
|
||||
return entry.Value, entry.Error
|
||||
}
|
||||
|
||||
if first {
|
||||
if attempt == 0 {
|
||||
// Record the miss if its our first time through
|
||||
atomic.AddUint64(&c.misses, 1)
|
||||
|
||||
@ -248,7 +248,7 @@ RETRY_GET:
|
||||
}
|
||||
|
||||
// No longer our first time through
|
||||
first = false
|
||||
attempt++
|
||||
|
||||
// Set our timeout channel if we must
|
||||
if info.Timeout > 0 && timeoutCh == nil {
|
||||
@ -257,7 +257,7 @@ RETRY_GET:
|
||||
|
||||
// At this point, we know we either don't have a value at all or the
|
||||
// value we have is too old. We need to wait for new data.
|
||||
waiterCh, err := c.fetch(t, key, r, true, 0)
|
||||
waiterCh, err := c.fetch(t, key, r, true, attempt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -296,6 +296,16 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
|
||||
return nil, fmt.Errorf("unknown type in cache: %s", t)
|
||||
}
|
||||
|
||||
// If we're over the attempt minimum, start an exponential backoff.
|
||||
if attempt > CacheRefreshBackoffMin {
|
||||
waitTime := (1 << (attempt - CacheRefreshBackoffMin)) * time.Second
|
||||
if waitTime > CacheRefreshMaxWait {
|
||||
waitTime = CacheRefreshMaxWait
|
||||
}
|
||||
|
||||
time.Sleep(waitTime)
|
||||
}
|
||||
|
||||
// We acquire a write lock because we may have to set Fetching to true.
|
||||
c.entriesLock.Lock()
|
||||
defer c.entriesLock.Unlock()
|
||||
@ -398,7 +408,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
|
||||
|
||||
// If refresh is enabled, run the refresh in due time. The refresh
|
||||
// below might block, but saves us from spawning another goroutine.
|
||||
if tEntry.Opts.Refresh {
|
||||
if !ok && tEntry.Opts.Refresh {
|
||||
c.refresh(tEntry.Opts, attempt, t, key, r)
|
||||
}
|
||||
}()
|
||||
@ -438,16 +448,6 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin
|
||||
return
|
||||
}
|
||||
|
||||
// If we're over the attempt minimum, start an exponential backoff.
|
||||
if attempt > CacheRefreshBackoffMin {
|
||||
waitTime := (1 << (attempt - CacheRefreshBackoffMin)) * time.Second
|
||||
if waitTime > CacheRefreshMaxWait {
|
||||
waitTime = CacheRefreshMaxWait
|
||||
}
|
||||
|
||||
time.Sleep(waitTime)
|
||||
}
|
||||
|
||||
// If we have a timer, wait for it
|
||||
if opts.RefreshTimer > 0 {
|
||||
time.Sleep(opts.RefreshTimer)
|
||||
|
41
agent/cache/cache_test.go
vendored
41
agent/cache/cache_test.go
vendored
@ -263,6 +263,47 @@ func TestCacheGet_blockingIndexTimeout(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test a get with an index set with requests returning an error
|
||||
// will perform a backoff on retrying the fetch.
|
||||
func TestCacheGet_blockingIndexBackoff(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
typ := TestType(t)
|
||||
defer typ.AssertExpectations(t)
|
||||
c := TestCache(t)
|
||||
c.RegisterType("t", typ, nil)
|
||||
|
||||
// Configure the type
|
||||
var retries uint32
|
||||
fetchErr := fmt.Errorf("test fetch error")
|
||||
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once()
|
||||
typ.Static(FetchResult{Value: nil, Index: 5}, fetchErr).Run(func(args mock.Arguments) {
|
||||
atomic.AddUint32(&retries, 1)
|
||||
})
|
||||
|
||||
// First good fetch to populate catch
|
||||
resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
|
||||
TestCacheGetChResult(t, resultCh, 1)
|
||||
|
||||
// Fetch should block
|
||||
resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
|
||||
Key: "hello", MinIndex: 7, Timeout: 1 * time.Minute}))
|
||||
|
||||
// Should block
|
||||
select {
|
||||
case <-resultCh:
|
||||
t.Fatal("should block")
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
}
|
||||
|
||||
// Wait a bit
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
// Check the number
|
||||
actual := atomic.LoadUint32(&retries)
|
||||
require.True(t, actual < 10, fmt.Sprintf("actual: %d", actual))
|
||||
}
|
||||
|
||||
// Test that if a Type returns an empty value on Fetch that the previous
|
||||
// value is preserved.
|
||||
func TestCacheGet_emptyFetchResult(t *testing.T) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user