mirror of
https://github.com/status-im/consul.git
synced 2025-01-12 14:55:02 +00:00
Fixes race condition in Agent Cache (#5796)
* Fix race condition during a cache get Check the entry we pulled out of the cache while holding the lock had Fetching set. If it did then we should use the existing Waiter instead of calling fetch. The reason this is better than just calling fetch is that fetch re-gets the entry out of the entries map and the previous fetch may have finished. Therefore this prevents erroneously starting a new fetch because we just missed the last update. * Fix race condition fully The first commit still allowed for the following scenario: • No entry existing when checked in getWithIndex while holding the read lock • Then by time we had reached fetch it had been created and finished. * always use ok when returning * comment mentioning the reading from entries. * use cacheHit consistently
This commit is contained in:
parent
7d178ca26b
commit
dbc48ea3f7
94
agent/cache/cache.go
vendored
94
agent/cache/cache.go
vendored
@ -214,6 +214,47 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
|
||||
return c.getWithIndex(t, r, r.CacheInfo().MinIndex)
|
||||
}
|
||||
|
||||
// getEntryLocked retrieves a cache entry and checks if it is ready to be
|
||||
// returned given the other parameters. It reads from entries and the caller
|
||||
// has to issue a read lock if necessary.
|
||||
func (c *Cache) getEntryLocked(tEntry typeEntry, key string, maxAge time.Duration, revalidate bool, minIndex uint64) (bool, bool, cacheEntry) {
|
||||
entry, ok := c.entries[key]
|
||||
cacheHit := false
|
||||
|
||||
if !ok {
|
||||
return ok, cacheHit, entry
|
||||
}
|
||||
|
||||
// Check if we have a hit
|
||||
cacheHit = ok && entry.Valid
|
||||
|
||||
supportsBlocking := tEntry.Type.SupportsBlocking()
|
||||
|
||||
// Check index is not specified or lower than value, or the type doesn't
|
||||
// support blocking.
|
||||
if cacheHit && supportsBlocking &&
|
||||
minIndex > 0 && minIndex >= entry.Index {
|
||||
// MinIndex was given and matches or is higher than current value so we
|
||||
// ignore the cache and fallthrough to blocking on a new value below.
|
||||
cacheHit = false
|
||||
}
|
||||
|
||||
// Check MaxAge is not exceeded if this is not a background refreshing type
|
||||
// and MaxAge was specified.
|
||||
if cacheHit && !tEntry.Opts.Refresh && maxAge > 0 &&
|
||||
!entry.FetchedAt.IsZero() && maxAge < time.Since(entry.FetchedAt) {
|
||||
cacheHit = false
|
||||
}
|
||||
|
||||
// Check if we are requested to revalidate. If so the first time round the
|
||||
// loop is not a hit but subsequent ones should be treated normally.
|
||||
if cacheHit && !tEntry.Opts.Refresh && revalidate {
|
||||
cacheHit = false
|
||||
}
|
||||
|
||||
return ok, cacheHit, entry
|
||||
}
|
||||
|
||||
// getWithIndex implements the main Get functionality but allows internal
|
||||
// callers (Watch) to manipulate the blocking index separately from the actual
|
||||
// request object.
|
||||
@ -249,36 +290,9 @@ RETRY_GET:
|
||||
|
||||
// Get the current value
|
||||
c.entriesLock.RLock()
|
||||
entry, ok := c.entries[key]
|
||||
_, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex)
|
||||
c.entriesLock.RUnlock()
|
||||
|
||||
// Check if we have a hit
|
||||
cacheHit := ok && entry.Valid
|
||||
|
||||
supportsBlocking := tEntry.Type.SupportsBlocking()
|
||||
|
||||
// Check index is not specified or lower than value, or the type doesn't
|
||||
// support blocking.
|
||||
if cacheHit && supportsBlocking &&
|
||||
minIndex > 0 && minIndex >= entry.Index {
|
||||
// MinIndex was given and matches or is higher than current value so we
|
||||
// ignore the cache and fallthrough to blocking on a new value below.
|
||||
cacheHit = false
|
||||
}
|
||||
|
||||
// Check MaxAge is not exceeded if this is not a background refreshing type
|
||||
// and MaxAge was specified.
|
||||
if cacheHit && !tEntry.Opts.Refresh && info.MaxAge > 0 &&
|
||||
!entry.FetchedAt.IsZero() && info.MaxAge < time.Since(entry.FetchedAt) {
|
||||
cacheHit = false
|
||||
}
|
||||
|
||||
// Check if we are requested to revalidate. If so the first time round the
|
||||
// loop is not a hit but subsequent ones should be treated normally.
|
||||
if cacheHit && !tEntry.Opts.Refresh && info.MustRevalidate && first {
|
||||
cacheHit = false
|
||||
}
|
||||
|
||||
if cacheHit {
|
||||
meta := ResultMeta{Index: entry.Index}
|
||||
if first {
|
||||
@ -344,9 +358,6 @@ RETRY_GET:
|
||||
}
|
||||
}
|
||||
|
||||
// No longer our first time through
|
||||
first = false
|
||||
|
||||
// Set our timeout channel if we must
|
||||
if info.Timeout > 0 && timeoutCh == nil {
|
||||
timeoutCh = time.After(info.Timeout)
|
||||
@ -354,11 +365,14 @@ RETRY_GET:
|
||||
|
||||
// At this point, we know we either don't have a value at all or the
|
||||
// value we have is too old. We need to wait for new data.
|
||||
waiterCh, err := c.fetch(t, key, r, true, 0)
|
||||
waiterCh, err := c.fetch(t, key, r, true, 0, minIndex, false, !first)
|
||||
if err != nil {
|
||||
return nil, ResultMeta{Index: entry.Index}, err
|
||||
}
|
||||
|
||||
// No longer our first time through
|
||||
first = false
|
||||
|
||||
select {
|
||||
case <-waiterCh:
|
||||
// Our fetch returned, retry the get from the cache.
|
||||
@ -384,7 +398,7 @@ func (c *Cache) entryKey(t string, r *RequestInfo) string {
|
||||
// If allowNew is true then the fetch should create the cache entry
|
||||
// if it doesn't exist. If this is false, then fetch will do nothing
|
||||
// if the entry doesn't exist. This latter case is to support refreshing.
|
||||
func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-chan struct{}, error) {
|
||||
func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) {
|
||||
// Get the type that we're fetching
|
||||
c.typesLock.RLock()
|
||||
tEntry, ok := c.types[t]
|
||||
@ -393,10 +407,20 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
|
||||
return nil, fmt.Errorf("unknown type in cache: %s", t)
|
||||
}
|
||||
|
||||
info := r.CacheInfo()
|
||||
|
||||
// We acquire a write lock because we may have to set Fetching to true.
|
||||
c.entriesLock.Lock()
|
||||
defer c.entriesLock.Unlock()
|
||||
entry, ok := c.entries[key]
|
||||
ok, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex)
|
||||
|
||||
// This handles the case where a fetch succeeded after checking for its existence in
|
||||
// getWithIndex. This ensures that we don't miss updates.
|
||||
if ok && cacheHit && !ignoreExisting {
|
||||
ch := make(chan struct{})
|
||||
close(ch)
|
||||
return ch, nil
|
||||
}
|
||||
|
||||
// If we aren't allowing new values and we don't have an existing value,
|
||||
// return immediately. We return an immediately-closed channel so nothing
|
||||
@ -656,7 +680,7 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin
|
||||
// Trigger. The "allowNew" field is false because in the time we were
|
||||
// waiting to refresh we may have expired and got evicted. If that
|
||||
// happened, we don't want to create a new entry.
|
||||
c.fetch(t, key, r, false, attempt)
|
||||
c.fetch(t, key, r, false, attempt, 0, true, true)
|
||||
}
|
||||
|
||||
// runExpiryLoop is a blocking function that watches the expiration
|
||||
|
1
agent/cache/cache_test.go
vendored
1
agent/cache/cache_test.go
vendored
@ -1123,7 +1123,6 @@ func TestCacheGet_nonBlockingType(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
typ := TestTypeNonBlocking(t)
|
||||
defer typ.AssertExpectations(t)
|
||||
c := TestCache(t)
|
||||
c.RegisterType("t", typ, nil)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user