cache: refactor agent cache fetching to prevent unnecessary fetches on error (#14956)

This continues the work done in #14908 where a crude solution to prevent a
goroutine leak was implemented. The former code would launch a perpetual
goroutine family every iteration (+1 +1) and the fixed code simply caused a
new goroutine family to first cancel the prior one to prevent the
leak (-1 +1 == 0).

This PR refactors this code completely to:

- make it more understandable
- remove the recursion-via-goroutine strangeness
- prevent unnecessary RPC fetches when the prior one has errored.

The core issue arose from a conflation of the entry.Fetching field to mean:

- there is an RPC (blocking query) in flight right now
- there is a goroutine running to manage the RPC fetch retry loop

The problem is that the goroutine-leak-avoidance check would treat
Fetching like (2), but within the body of a goroutine it would flip that
boolean back to false before the retry sleep. This would cause a new
chain of goroutines to launch which #14908 would correct crudely.

The refactored code uses a plain for-loop and changes the semantics
to track state for "is there a goroutine associated with this cache entry"
instead of the former.

We use a uint64 unique identity per goroutine instead of a boolean so
that any orphaned goroutines can tell when they've been replaced when
the expiry loop deletes a cache entry while the goroutine is still running
and is later replaced.
This commit is contained in:
R.B. Boyer 2022-10-25 10:27:26 -05:00 committed by GitHub
parent cbbc0036b6
commit 3c44116a8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 196 additions and 175 deletions

3
.changelog/14956.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:bug
cache: refactor agent cache fetching to prevent unnecessary fetches on error
```

View File

@ -4795,19 +4795,19 @@ services {
deadlineCh := time.After(10 * time.Second) deadlineCh := time.After(10 * time.Second)
start := time.Now() start := time.Now()
LOOP:
for { for {
select { select {
case evt := <-ch: case evt := <-ch:
// We may receive several notifications of an error until we get the // We may receive several notifications of an error until we get the
// first successful reply. // first successful reply.
require.Equal(t, "foo", evt.CorrelationID) require.Equal(t, "foo", evt.CorrelationID)
if evt.Err != nil { if evt.Err == nil {
break LOOP require.NoError(t, evt.Err)
require.NotNil(t, evt.Result)
t.Logf("took %s to get first success", time.Since(start))
return
} }
require.NoError(t, evt.Err) t.Logf("saw error: %v", evt.Err)
require.NotNil(t, evt.Result)
t.Logf("took %s to get first success", time.Since(start))
case <-deadlineCh: case <-deadlineCh:
t.Fatal("did not get notified successfully") t.Fatal("did not get notified successfully")
} }

308
agent/cache/cache.go vendored
View File

@ -84,8 +84,8 @@ var Counters = []prometheus.CounterDefinition{
// Constants related to refresh backoff. We probably don't ever need to // Constants related to refresh backoff. We probably don't ever need to
// make these configurable knobs since they primarily exist to lower load. // make these configurable knobs since they primarily exist to lower load.
const ( const (
CacheRefreshBackoffMin = 3 // 3 attempts before backing off DefaultCacheRefreshBackoffMin = 3 // 3 attempts before backing off
CacheRefreshMaxWait = 1 * time.Minute // maximum backoff wait time DefaultCacheRefreshMaxWait = 1 * time.Minute // maximum backoff wait time
// The following constants are default values for the cache entry // The following constants are default values for the cache entry
// rate limiter settings. // rate limiter settings.
@ -138,10 +138,7 @@ type Cache struct {
entriesLock sync.RWMutex entriesLock sync.RWMutex
entries map[string]cacheEntry entries map[string]cacheEntry
entriesExpiryHeap *ttlcache.ExpiryHeap entriesExpiryHeap *ttlcache.ExpiryHeap
lastGoroutineID uint64
fetchLock sync.Mutex
lastFetchID uint64
fetchHandles map[string]fetchHandle
// stopped is used as an atomic flag to signal that the Cache has been // stopped is used as an atomic flag to signal that the Cache has been
// discarded so background fetches and expiry processing should stop. // discarded so background fetches and expiry processing should stop.
@ -154,11 +151,6 @@ type Cache struct {
rateLimitCancel context.CancelFunc rateLimitCancel context.CancelFunc
} }
type fetchHandle struct {
id uint64
stopCh chan struct{}
}
// typeEntry is a single type that is registered with a Cache. // typeEntry is a single type that is registered with a Cache.
type typeEntry struct { type typeEntry struct {
// Name that was used to register the Type // Name that was used to register the Type
@ -204,6 +196,13 @@ type Options struct {
EntryFetchMaxBurst int EntryFetchMaxBurst int
// EntryFetchRate represents the max calls/sec for a single cache entry // EntryFetchRate represents the max calls/sec for a single cache entry
EntryFetchRate rate.Limit EntryFetchRate rate.Limit
// CacheRefreshBackoffMin is the number of attempts to wait before backing off.
// Mostly configurable just for testing.
CacheRefreshBackoffMin uint
// CacheRefreshMaxWait is the maximum backoff wait time.
// Mostly configurable just for testing.
CacheRefreshMaxWait time.Duration
} }
// Equal return true if both options are equivalent // Equal return true if both options are equivalent
@ -219,6 +218,12 @@ func applyDefaultValuesOnOptions(options Options) Options {
if options.EntryFetchMaxBurst == 0 { if options.EntryFetchMaxBurst == 0 {
options.EntryFetchMaxBurst = DefaultEntryFetchMaxBurst options.EntryFetchMaxBurst = DefaultEntryFetchMaxBurst
} }
if options.CacheRefreshBackoffMin == 0 {
options.CacheRefreshBackoffMin = DefaultCacheRefreshBackoffMin
}
if options.CacheRefreshMaxWait == 0 {
options.CacheRefreshMaxWait = DefaultCacheRefreshMaxWait
}
if options.Logger == nil { if options.Logger == nil {
options.Logger = hclog.New(nil) options.Logger = hclog.New(nil)
} }
@ -234,7 +239,6 @@ func New(options Options) *Cache {
types: make(map[string]typeEntry), types: make(map[string]typeEntry),
entries: make(map[string]cacheEntry), entries: make(map[string]cacheEntry),
entriesExpiryHeap: ttlcache.NewExpiryHeap(), entriesExpiryHeap: ttlcache.NewExpiryHeap(),
fetchHandles: make(map[string]fetchHandle),
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
options: options, options: options,
rateLimitContext: ctx, rateLimitContext: ctx,
@ -404,11 +408,23 @@ func (c *Cache) getEntryLocked(
// Check if re-validate is requested. If so the first time round the // Check if re-validate is requested. If so the first time round the
// loop is not a hit but subsequent ones should be treated normally. // loop is not a hit but subsequent ones should be treated normally.
if !tEntry.Opts.Refresh && info.MustRevalidate { if !tEntry.Opts.Refresh && info.MustRevalidate {
if entry.Fetching { // It is important to note that this block ONLY applies when we are not
// There is an active blocking query for this data, which has not // in indefinite refresh mode (where the underlying goroutine will
// returned. We can logically deduce that the contents of the cache // continue to re-query for data).
// are actually current, and we can simply return this while //
// leaving the blocking query alone. // In this mode goroutines have a 1:1 relationship to RPCs that get
// executed, and importantly they DO NOT SLEEP after executing.
//
// This means that a running goroutine for this cache entry extremely
// strongly implies that the RPC has not yet completed, which is why
// this check works for the revalidation-avoidance optimization here.
if entry.GoroutineID != 0 {
// There is an active goroutine performing a blocking query for
// this data, which has not returned.
//
// We can logically deduce that the contents of the cache are
// actually current, and we can simply return this while leaving
// the blocking query alone.
return true, true, entry return true, true, entry
} }
return true, false, entry return true, false, entry
@ -538,7 +554,7 @@ RETRY_GET:
// At this point, we know we either don't have a value at all or the // At this point, we know we either don't have a value at all or the
// value we have is too old. We need to wait for new data. // value we have is too old. We need to wait for new data.
waiterCh := c.fetch(key, r, true, 0, false) waiterCh := c.fetch(key, r)
// No longer our first time through // No longer our first time through
first = false first = false
@ -565,46 +581,36 @@ func makeEntryKey(t, dc, peerName, token, key string) string {
return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key) return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key)
} }
// fetch triggers a new background fetch for the given Request. If a // fetch triggers a new background fetch for the given Request. If a background
// background fetch is already running for a matching Request, the waiter // fetch is already running or a goroutine to manage that still exists for a
// channel for that request is returned. The effect of this is that there // matching Request, the waiter channel for that request is returned. The
// is only ever one blocking query for any matching requests. // effect of this is that there is only ever one blocking query and goroutine
// // for any matching requests.
// If allowNew is true then the fetch should create the cache entry func (c *Cache) fetch(key string, r getOptions) <-chan struct{} {
// if it doesn't exist. If this is false, then fetch will do nothing
// if the entry doesn't exist. This latter case is to support refreshing.
func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ignoreExisting bool) <-chan struct{} {
// We acquire a write lock because we may have to set Fetching to true.
c.entriesLock.Lock() c.entriesLock.Lock()
defer c.entriesLock.Unlock() defer c.entriesLock.Unlock()
ok, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, r.Info) ok, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, r.Info)
// This handles the case where a fetch succeeded after checking for its existence in switch {
// getWithIndex. This ensures that we don't miss updates. case ok && entryValid:
if ok && entryValid && !ignoreExisting { // This handles the case where a fetch succeeded after checking for its
// existence in getWithIndex. This ensures that we don't miss updates.
ch := make(chan struct{}) ch := make(chan struct{})
close(ch) close(ch)
return ch return ch
}
// If we aren't allowing new values and we don't have an existing value, case ok && entry.GoroutineID != 0:
// return immediately. We return an immediately-closed channel so nothing // If we already have an entry and there's a goroutine to keep it
// blocks. // refreshed then don't spawn another one to do the same work.
if !ok && !allowNew { //
ch := make(chan struct{}) // Return the currently active waiter.
close(ch)
return ch
}
// If we already have an entry and it is actively fetching, then return
// the currently active waiter.
if ok && entry.Fetching {
return entry.Waiter return entry.Waiter
}
// If we don't have an entry, then create it. The entry must be marked case !ok:
// as invalid so that it isn't returned as a valid value for a zero index. // If we don't have an entry, then create it. The entry must be marked
if !ok { // as invalid so that it isn't returned as a valid value for a zero
// index.
entry = cacheEntry{ entry = cacheEntry{
Valid: false, Valid: false,
Waiter: make(chan struct{}), Waiter: make(chan struct{}),
@ -615,27 +621,100 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
} }
} }
// Set that we're fetching to true, which makes it so that future // Assign each background fetching goroutine a unique ID and fingerprint
// identical calls to fetch will return the same waiter rather than // the cache entry with the same ID. This way if the cache entry is ever
// perform multiple fetches. // cleaned up due to expiry and later recreated the old goroutine can
entry.Fetching = true // detect that and terminate rather than leak and do double work.
c.lastGoroutineID++
entry.GoroutineID = c.lastGoroutineID
c.entries[key] = entry c.entries[key] = entry
metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries))) metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries)))
metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries))) metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries)))
// The actual Fetch must be performed in a goroutine.
go c.launchBackgroundFetcher(entry.GoroutineID, key, r)
return entry.Waiter
}
func (c *Cache) launchBackgroundFetcher(goroutineID uint64, key string, r getOptions) {
defer func() {
c.entriesLock.Lock()
defer c.entriesLock.Unlock()
entry, ok := c.entries[key]
if ok && entry.GoroutineID == goroutineID {
entry.GoroutineID = 0
c.entries[key] = entry
}
}()
var attempt uint
for {
shouldStop, shouldBackoff := c.runBackgroundFetcherOnce(goroutineID, key, r)
if shouldStop {
return
}
if shouldBackoff {
attempt++
} else {
attempt = 0
}
// If we're over the attempt minimum, start an exponential backoff.
wait := backOffWait(c.options, attempt)
// If we have a timer, wait for it
wait += r.TypeEntry.Opts.RefreshTimer
select {
case <-time.After(wait):
case <-c.stopCh:
return // Check if cache was stopped
}
// Trigger.
r.Info.MustRevalidate = false
r.Info.MinIndex = 0
// We acquire a write lock because we may have to set Fetching to true.
c.entriesLock.Lock()
entry, ok := c.entries[key]
if !ok || entry.GoroutineID != goroutineID {
// If we don't have an existing entry, return immediately.
//
// Also if we already have an entry and it is actively fetching, then
// return immediately.
//
// If we've somehow lost control of the entry, also return.
c.entriesLock.Unlock()
return
}
c.entries[key] = entry
metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries)))
metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries)))
c.entriesLock.Unlock()
}
}
func (c *Cache) runBackgroundFetcherOnce(goroutineID uint64, key string, r getOptions) (shouldStop, shouldBackoff bool) {
// Freshly re-read this, rather than relying upon the caller to fetch it
// and pass it in.
c.entriesLock.RLock()
entry, ok := c.entries[key]
c.entriesLock.RUnlock()
if !ok || entry.GoroutineID != goroutineID {
// If we don't have an existing entry, return immediately.
//
// Also if something weird has happened to orphan this goroutine, also
// return immediately.
return true, false
}
tEntry := r.TypeEntry tEntry := r.TypeEntry
{ // NOTE: this indentation is here to facilitate the PR review diff only
// The actual Fetch must be performed in a goroutine. Ensure that we only
// have one in-flight at a time, but don't use a deferred
// context.WithCancel style termination so that these things outlive their
// requester.
//
// By the time we get here the system WANTS to make a replacement fetcher, so
// we terminate the prior one and replace it.
handle := c.getOrReplaceFetchHandle(key)
go func(handle fetchHandle) {
defer c.deleteFetchHandle(key, handle.id)
// If we have background refresh and currently are in "disconnected" state, // If we have background refresh and currently are in "disconnected" state,
// waiting for a response might mean we mark our results as stale for up to // waiting for a response might mean we mark our results as stale for up to
// 10 minutes (max blocking timeout) after connection is restored. To reduce // 10 minutes (max blocking timeout) after connection is restored. To reduce
@ -649,7 +728,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
c.entriesLock.Lock() c.entriesLock.Lock()
defer c.entriesLock.Unlock() defer c.entriesLock.Unlock()
entry, ok := c.entries[key] entry, ok := c.entries[key]
if !ok || entry.RefreshLostContact.IsZero() { if !ok || entry.RefreshLostContact.IsZero() || entry.GoroutineID != goroutineID {
return return
} }
entry.RefreshLostContact = time.Time{} entry.RefreshLostContact = time.Time{}
@ -673,12 +752,15 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
Index: entry.Index, Index: entry.Index,
} }
} }
if err := entry.FetchRateLimiter.Wait(c.rateLimitContext); err != nil { if err := entry.FetchRateLimiter.Wait(c.rateLimitContext); err != nil {
if connectedTimer != nil { if connectedTimer != nil {
connectedTimer.Stop() connectedTimer.Stop()
} }
entry.Error = fmt.Errorf("rateLimitContext canceled: %s", err.Error()) entry.Error = fmt.Errorf("rateLimitContext canceled: %s", err.Error())
return // NOTE: this can only happen when the entire cache is being
// shutdown and isn't something that can happen normally.
return true, false
} }
// Start building the new entry by blocking on the fetch. // Start building the new entry by blocking on the fetch.
result, err := r.Fetch(fOpts) result, err := r.Fetch(fOpts)
@ -686,17 +768,8 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
connectedTimer.Stop() connectedTimer.Stop()
} }
// If we were stopped while waiting on a blocking query now would be a
// good time to detect that.
select {
case <-handle.stopCh:
return
default:
}
// Copy the existing entry to start. // Copy the existing entry to start.
newEntry := entry newEntry := entry
newEntry.Fetching = false
// Importantly, always reset the Error. Having both Error and a Value that // Importantly, always reset the Error. Having both Error and a Value that
// are non-nil is allowed in the cache entry but it indicates that the Error // are non-nil is allowed in the cache entry but it indicates that the Error
@ -752,7 +825,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
if result.Index > 0 { if result.Index > 0 {
// Reset the attempts counter so we don't have any backoff // Reset the attempts counter so we don't have any backoff
attempt = 0 shouldBackoff = false
} else { } else {
// Result having a zero index is an implicit error case. There was no // Result having a zero index is an implicit error case. There was no
// actual error but it implies the RPC found in index (nothing written // actual error but it implies the RPC found in index (nothing written
@ -767,7 +840,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
// state it can be considered a bug in the RPC implementation (to ever // state it can be considered a bug in the RPC implementation (to ever
// return a zero index) however since it can happen this is a safety net // return a zero index) however since it can happen this is a safety net
// for the future. // for the future.
attempt++ shouldBackoff = true
} }
// If we have refresh active, this successful response means cache is now // If we have refresh active, this successful response means cache is now
@ -787,7 +860,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
metrics.IncrCounterWithLabels([]string{"cache", tEntry.Name, "fetch_error"}, 1, labels) metrics.IncrCounterWithLabels([]string{"cache", tEntry.Name, "fetch_error"}, 1, labels)
// Increment attempt counter // Increment attempt counter
attempt++ shouldBackoff = true
// If we are refreshing and just failed, updated the lost contact time as // If we are refreshing and just failed, updated the lost contact time as
// our cache will be stale until we get successfully reconnected. We only // our cache will be stale until we get successfully reconnected. We only
@ -804,7 +877,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
// Set our entry // Set our entry
c.entriesLock.Lock() c.entriesLock.Lock()
if _, ok := c.entries[key]; !ok { if currEntry, ok := c.entries[key]; !ok || currEntry.GoroutineID != goroutineID {
// This entry was evicted during our fetch. DON'T re-insert it or fall // This entry was evicted during our fetch. DON'T re-insert it or fall
// through to the refresh loop below otherwise it will live forever! In // through to the refresh loop below otherwise it will live forever! In
// theory there should not be any Get calls waiting on entry.Waiter since // theory there should not be any Get calls waiting on entry.Waiter since
@ -817,7 +890,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
// Trigger any waiters that are around. // Trigger any waiters that are around.
close(entry.Waiter) close(entry.Waiter)
return return true, false
} }
// If this is a new entry (not in the heap yet), then setup the // If this is a new entry (not in the heap yet), then setup the
@ -842,79 +915,22 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
// request back up again shortly but in the general case this prevents // request back up again shortly but in the general case this prevents
// spamming the logs with tons of ACL not found errors for days. // spamming the logs with tons of ACL not found errors for days.
if tEntry.Opts.Refresh && !preventRefresh { if tEntry.Opts.Refresh && !preventRefresh {
// Check if cache was stopped return false, shouldBackoff
if atomic.LoadUint32(&c.stopped) == 1 {
return
}
// If we're over the attempt minimum, start an exponential backoff.
wait := backOffWait(attempt)
// If we have a timer, wait for it
wait += tEntry.Opts.RefreshTimer
select {
case <-time.After(wait):
case <-handle.stopCh:
return
}
// Trigger. The "allowNew" field is false because in the time we were
// waiting to refresh we may have expired and got evicted. If that
// happened, we don't want to create a new entry.
r.Info.MustRevalidate = false
r.Info.MinIndex = 0
c.fetch(key, r, false, attempt, true)
} }
}(handle) }
return entry.Waiter return true, false
} }
func (c *Cache) getOrReplaceFetchHandle(key string) fetchHandle { func backOffWait(opts Options, failures uint) time.Duration {
c.fetchLock.Lock() if failures > opts.CacheRefreshBackoffMin {
defer c.fetchLock.Unlock() shift := failures - opts.CacheRefreshBackoffMin
waitTime := opts.CacheRefreshMaxWait
if prevHandle, ok := c.fetchHandles[key]; ok {
close(prevHandle.stopCh)
}
c.lastFetchID++
handle := fetchHandle{
id: c.lastFetchID,
stopCh: make(chan struct{}),
}
c.fetchHandles[key] = handle
return handle
}
func (c *Cache) deleteFetchHandle(key string, fetchID uint64) {
c.fetchLock.Lock()
defer c.fetchLock.Unlock()
// Only remove a fetchHandle if it's YOUR fetchHandle.
handle, ok := c.fetchHandles[key]
if !ok {
return
}
if handle.id == fetchID {
delete(c.fetchHandles, key)
}
}
func backOffWait(failures uint) time.Duration {
if failures > CacheRefreshBackoffMin {
shift := failures - CacheRefreshBackoffMin
waitTime := CacheRefreshMaxWait
if shift < 31 { if shift < 31 {
waitTime = (1 << shift) * time.Second waitTime = (1 << shift) * time.Second
} }
if waitTime > CacheRefreshMaxWait { if waitTime > opts.CacheRefreshMaxWait {
waitTime = CacheRefreshMaxWait waitTime = opts.CacheRefreshMaxWait
} }
return waitTime + lib.RandomStagger(waitTime) return waitTime + lib.RandomStagger(waitTime)
} }

View File

@ -18,7 +18,6 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/lib/ttlcache" "github.com/hashicorp/consul/lib/ttlcache"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
) )
// Test a basic Get with no indexes (and therefore no blocking queries). // Test a basic Get with no indexes (and therefore no blocking queries).
@ -1751,22 +1750,12 @@ func TestCache_RefreshLifeCycle(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, true, result) require.Equal(t, true, result)
waitUntilFetching := func(expectValue bool) {
retry.Run(t, func(t *retry.R) {
c.entriesLock.Lock()
defer c.entriesLock.Unlock()
entry, ok := c.entries[key]
require.True(t, ok)
if expectValue {
require.True(t, entry.Fetching)
} else {
require.False(t, entry.Fetching)
}
})
}
// ensure that the entry is fetching again // ensure that the entry is fetching again
waitUntilFetching(true) c.entriesLock.Lock()
entry, ok := c.entries[key]
require.True(t, ok)
require.True(t, entry.GoroutineID > 0)
c.entriesLock.Unlock()
requestChan := make(chan error) requestChan := make(chan error)
@ -1800,7 +1789,11 @@ func TestCache_RefreshLifeCycle(t *testing.T) {
} }
// ensure that the entry is fetching again // ensure that the entry is fetching again
waitUntilFetching(true) c.entriesLock.Lock()
entry, ok = c.entries[key]
require.True(t, ok)
require.True(t, entry.GoroutineID > 0)
c.entriesLock.Unlock()
// background a call that will wait for a newer version - will result in an acl not found error // background a call that will wait for a newer version - will result in an acl not found error
go getError(5) go getError(5)
@ -1821,7 +1814,11 @@ func TestCache_RefreshLifeCycle(t *testing.T) {
// ensure that the ACL not found error killed off the background refresh // ensure that the ACL not found error killed off the background refresh
// but didn't remove it from the cache // but didn't remove it from the cache
waitUntilFetching(false) c.entriesLock.Lock()
entry, ok = c.entries[key]
require.True(t, ok)
require.False(t, entry.GoroutineID > 0)
c.entriesLock.Unlock()
} }
type fakeType struct { type fakeType struct {

View File

@ -26,9 +26,9 @@ type cacheEntry struct {
Index uint64 Index uint64
// Metadata that is used for internal accounting // Metadata that is used for internal accounting
Valid bool // True if the Value is set Valid bool // True if the Value is set
Fetching bool // True if a fetch is already active GoroutineID uint64 // Nonzero if a fetch goroutine is running.
Waiter chan struct{} // Closed when this entry is invalidated Waiter chan struct{} // Closed when this entry is invalidated
// Expiry contains information about the expiration of this // Expiry contains information about the expiration of this
// entry. This is a pointer as its shared as a value in the // entry. This is a pointer as its shared as a value in the

View File

@ -136,7 +136,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlati
failures = 0 failures = 0
} else { } else {
failures++ failures++
wait = backOffWait(failures) wait = backOffWait(c.options, failures)
c.options.Logger. c.options.Logger.
With("error", err). With("error", err).
@ -223,7 +223,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlatio
// as this would eliminate the single-flighting of these requests in the cache and // as this would eliminate the single-flighting of these requests in the cache and
// the efficiencies gained by it. // the efficiencies gained by it.
if failures > 0 { if failures > 0 {
wait = backOffWait(failures) wait = backOffWait(c.options, failures)
} else { } else {
// Calculate when the cached data's Age will get too stale and // Calculate when the cached data's Age will get too stale and
// need to be re-queried. When the data's Age already exceeds the // need to be re-queried. When the data's Age already exceeds the

View File

@ -78,6 +78,8 @@
"BootstrapExpect": 0, "BootstrapExpect": 0,
"BuildDate": "2019-11-20 05:00:00 +0000 UTC", "BuildDate": "2019-11-20 05:00:00 +0000 UTC",
"Cache": { "Cache": {
"CacheRefreshBackoffMin": 0,
"CacheRefreshMaxWait": "0s",
"EntryFetchMaxBurst": 42, "EntryFetchMaxBurst": 42,
"EntryFetchRate": 0.334, "EntryFetchRate": 0.334,
"Logger": null "Logger": null

View File

@ -216,6 +216,9 @@ func (a *TestAgent) Start(t *testing.T) error {
} else { } else {
result.RuntimeConfig.Telemetry.Disable = true result.RuntimeConfig.Telemetry.Disable = true
} }
// Lower the maximum backoff period of a cache refresh just for
// tests see #14956 for more.
result.RuntimeConfig.Cache.CacheRefreshMaxWait = 1 * time.Second
} }
return result, err return result, err
} }