agent/cache: change behavior to return error rather than retry

The cache behavior should not be to mask errors and retry. Instead, it
should aim to return errors as quickly as possible. We do that here.
This commit is contained in:
Mitchell Hashimoto 2018-06-03 13:15:09 -07:00 committed by Jack Pearkes
parent 311d503fb0
commit 45e49f31de
3 changed files with 27 additions and 32 deletions

47
agent/cache/cache.go vendored
View File

@ -192,7 +192,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, error) {
key := c.entryKey(&info) key := c.entryKey(&info)
// First time through // First time through
var attempt uint first := true
// timeoutCh for watching our timeout // timeoutCh for watching our timeout
var timeoutCh <-chan time.Time var timeoutCh <-chan time.Time
@ -209,7 +209,7 @@ RETRY_GET:
// we have. // we have.
if ok && entry.Valid { if ok && entry.Valid {
if info.MinIndex == 0 || info.MinIndex < entry.Index { if info.MinIndex == 0 || info.MinIndex < entry.Index {
if attempt == 0 { if first {
metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1) metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1)
atomic.AddUint64(&c.hits, 1) atomic.AddUint64(&c.hits, 1)
} }
@ -220,7 +220,11 @@ RETRY_GET:
c.entriesExpiryHeap.Fix(entry.Expiry) c.entriesExpiryHeap.Fix(entry.Expiry)
c.entriesLock.Unlock() c.entriesLock.Unlock()
return entry.Value, entry.Error // We purposely do not return an error here since the cache
// only works with fetching values that either have a value
// or have an error, but not both. The Error may be non-nil
// in the entry because of this to note future fetch errors.
return entry.Value, nil
} }
} }
@ -229,11 +233,11 @@ RETRY_GET:
// a retry loop getting the same error for the entire duration of the // a retry loop getting the same error for the entire duration of the
// timeout. Instead, we make one effort to fetch a new value, and if // timeout. Instead, we make one effort to fetch a new value, and if
// there was an error, we return. // there was an error, we return.
if attempt > 0 && entry.Error != nil { if !first && entry.Error != nil {
return entry.Value, entry.Error return entry.Value, entry.Error
} }
if attempt == 0 { if first {
// Record the miss if its our first time through // Record the miss if its our first time through
atomic.AddUint64(&c.misses, 1) atomic.AddUint64(&c.misses, 1)
@ -248,7 +252,7 @@ RETRY_GET:
} }
// No longer our first time through // No longer our first time through
attempt++ first = false
// Set our timeout channel if we must // Set our timeout channel if we must
if info.Timeout > 0 && timeoutCh == nil { if info.Timeout > 0 && timeoutCh == nil {
@ -257,7 +261,7 @@ RETRY_GET:
// At this point, we know we either don't have a value at all or the // At this point, we know we either don't have a value at all or the
// value we have is too old. We need to wait for new data. // value we have is too old. We need to wait for new data.
waiterCh, err := c.fetch(t, key, r, true, attempt) waiterCh, err := c.fetch(t, key, r, true, 0)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -296,16 +300,6 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
return nil, fmt.Errorf("unknown type in cache: %s", t) return nil, fmt.Errorf("unknown type in cache: %s", t)
} }
// If we're over the attempt minimum, start an exponential backoff.
if attempt > CacheRefreshBackoffMin {
waitTime := (1 << (attempt - CacheRefreshBackoffMin)) * time.Second
if waitTime > CacheRefreshMaxWait {
waitTime = CacheRefreshMaxWait
}
time.Sleep(waitTime)
}
// We acquire a write lock because we may have to set Fetching to true. // We acquire a write lock because we may have to set Fetching to true.
c.entriesLock.Lock() c.entriesLock.Lock()
defer c.entriesLock.Unlock() defer c.entriesLock.Unlock()
@ -354,7 +348,6 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
// A new value was given, so we create a brand new entry. // A new value was given, so we create a brand new entry.
newEntry.Value = result.Value newEntry.Value = result.Value
newEntry.Index = result.Index newEntry.Index = result.Index
newEntry.Error = err
// This is a valid entry with a result // This is a valid entry with a result
newEntry.Valid = true newEntry.Valid = true
@ -374,12 +367,8 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
// Increment attempt counter // Increment attempt counter
attempt++ attempt++
// If the entry wasn't valid, we set an error. If it was valid, // Set the error that should be used if the fetch is failing.
// we don't set an error so that the prior value can continue newEntry.Error = err
// being used. This will be evicted if the TTL comes up.
if !newEntry.Valid {
newEntry.Error = err
}
} }
// Create a new waiter that will be used for the next fetch. // Create a new waiter that will be used for the next fetch.
@ -448,6 +437,16 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin
return return
} }
// If we're over the attempt minimum, start an exponential backoff.
if attempt > CacheRefreshBackoffMin {
waitTime := (1 << (attempt - CacheRefreshBackoffMin)) * time.Second
if waitTime > CacheRefreshMaxWait {
waitTime = CacheRefreshMaxWait
}
time.Sleep(waitTime)
}
// If we have a timer, wait for it // If we have a timer, wait for it
if opts.RefreshTimer > 0 { if opts.RefreshTimer > 0 {
time.Sleep(opts.RefreshTimer) time.Sleep(opts.RefreshTimer)

View File

@ -285,16 +285,10 @@ func TestCacheGet_blockingIndexBackoff(t *testing.T) {
resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
TestCacheGetChResult(t, resultCh, 1) TestCacheGetChResult(t, resultCh, 1)
// Fetch should block // Fetch should not block and should return error
resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{ resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
Key: "hello", MinIndex: 7, Timeout: 1 * time.Minute})) Key: "hello", MinIndex: 7, Timeout: 1 * time.Minute}))
TestCacheGetChResult(t, resultCh, nil)
// Should block
select {
case <-resultCh:
t.Fatal("should block")
case <-time.After(50 * time.Millisecond):
}
// Wait a bit // Wait a bit
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)

View File

@ -44,7 +44,9 @@ func TestCacheGetChResult(t testing.T, ch <-chan interface{}, expected interface
if !reflect.DeepEqual(result, expected) { if !reflect.DeepEqual(result, expected) {
t.Fatalf("Result doesn't match!\n\n%#v\n\n%#v", result, expected) t.Fatalf("Result doesn't match!\n\n%#v\n\n%#v", result, expected)
} }
case <-time.After(50 * time.Millisecond): case <-time.After(50 * time.Millisecond):
t.Fatalf("Result not sent on channel")
} }
} }