mirror of
https://github.com/status-im/consul.git
synced 2025-01-21 19:20:41 +00:00
cache: prevent goroutine leak in agent cache (#14908)
There is a bug in the error handling code for the Agent cache subsystem discovered: 1. NotifyCallback calls notifyBlockingQuery which calls getWithIndex in a loop (which backs off on-error up to 1 minute) 2. getWithIndex calls fetch if there’s no valid entry in the cache 3. fetch starts a goroutine which calls Fetch on the cache-type, waits for a while (again with backoff up to 1 minute for errors) and then calls fetch to trigger a refresh The end result being that every 1 minute notifyBlockingQuery spawns an ancestry of goroutines that essentially lives forever. This PR ensures that the goroutine started by `fetch` cancels any prior goroutine spawned by the same line for the same key. In isolated testing where a cache type was tweaked to indefinitely error, this patch prevented goroutine counts from skyrocketing.
This commit is contained in:
parent
02a858efa0
commit
fe2d41ddad
3
.changelog/14908.txt
Normal file
3
.changelog/14908.txt
Normal file
@ -0,0 +1,3 @@
|
||||
```release-note:bug
|
||||
cache: prevent goroutine leak in agent cache
|
||||
```
|
81
agent/cache/cache.go
vendored
81
agent/cache/cache.go
vendored
@ -139,6 +139,10 @@ type Cache struct {
|
||||
entries map[string]cacheEntry
|
||||
entriesExpiryHeap *ttlcache.ExpiryHeap
|
||||
|
||||
fetchLock sync.Mutex
|
||||
lastFetchID uint64
|
||||
fetchHandles map[string]fetchHandle
|
||||
|
||||
// stopped is used as an atomic flag to signal that the Cache has been
|
||||
// discarded so background fetches and expiry processing should stop.
|
||||
stopped uint32
|
||||
@ -150,6 +154,11 @@ type Cache struct {
|
||||
rateLimitCancel context.CancelFunc
|
||||
}
|
||||
|
||||
type fetchHandle struct {
|
||||
id uint64
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
// typeEntry is a single type that is registered with a Cache.
|
||||
type typeEntry struct {
|
||||
// Name that was used to register the Type
|
||||
@ -225,6 +234,7 @@ func New(options Options) *Cache {
|
||||
types: make(map[string]typeEntry),
|
||||
entries: make(map[string]cacheEntry),
|
||||
entriesExpiryHeap: ttlcache.NewExpiryHeap(),
|
||||
fetchHandles: make(map[string]fetchHandle),
|
||||
stopCh: make(chan struct{}),
|
||||
options: options,
|
||||
rateLimitContext: ctx,
|
||||
@ -614,8 +624,18 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
|
||||
metrics.SetGauge([]string{"cache", "entries_count"}, float32(len(c.entries)))
|
||||
|
||||
tEntry := r.TypeEntry
|
||||
// The actual Fetch must be performed in a goroutine.
|
||||
go func() {
|
||||
|
||||
// The actual Fetch must be performed in a goroutine. Ensure that we only
|
||||
// have one in-flight at a time, but don't use a deferred
|
||||
// context.WithCancel style termination so that these things outlive their
|
||||
// requester.
|
||||
//
|
||||
// By the time we get here the system WANTS to make a replacement fetcher, so
|
||||
// we terminate the prior one and replace it.
|
||||
handle := c.getOrReplaceFetchHandle(key)
|
||||
go func(handle fetchHandle) {
|
||||
defer c.deleteFetchHandle(key, handle.id)
|
||||
|
||||
// If we have background refresh and currently are in "disconnected" state,
|
||||
// waiting for a response might mean we mark our results as stale for up to
|
||||
// 10 minutes (max blocking timeout) after connection is restored. To reduce
|
||||
@ -666,6 +686,14 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
|
||||
connectedTimer.Stop()
|
||||
}
|
||||
|
||||
// If we were stopped while waiting on a blocking query now would be a
|
||||
// good time to detect that.
|
||||
select {
|
||||
case <-handle.stopCh:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Copy the existing entry to start.
|
||||
newEntry := entry
|
||||
newEntry.Fetching = false
|
||||
@ -820,13 +848,15 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
|
||||
}
|
||||
|
||||
// If we're over the attempt minimum, start an exponential backoff.
|
||||
if wait := backOffWait(attempt); wait > 0 {
|
||||
time.Sleep(wait)
|
||||
}
|
||||
wait := backOffWait(attempt)
|
||||
|
||||
// If we have a timer, wait for it
|
||||
if tEntry.Opts.RefreshTimer > 0 {
|
||||
time.Sleep(tEntry.Opts.RefreshTimer)
|
||||
wait += tEntry.Opts.RefreshTimer
|
||||
|
||||
select {
|
||||
case <-time.After(wait):
|
||||
case <-handle.stopCh:
|
||||
return
|
||||
}
|
||||
|
||||
// Trigger. The "allowNew" field is false because in the time we were
|
||||
@ -836,11 +866,46 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
|
||||
r.Info.MinIndex = 0
|
||||
c.fetch(key, r, false, attempt, true)
|
||||
}
|
||||
}()
|
||||
}(handle)
|
||||
|
||||
return entry.Waiter
|
||||
}
|
||||
|
||||
func (c *Cache) getOrReplaceFetchHandle(key string) fetchHandle {
|
||||
c.fetchLock.Lock()
|
||||
defer c.fetchLock.Unlock()
|
||||
|
||||
if prevHandle, ok := c.fetchHandles[key]; ok {
|
||||
close(prevHandle.stopCh)
|
||||
}
|
||||
|
||||
c.lastFetchID++
|
||||
|
||||
handle := fetchHandle{
|
||||
id: c.lastFetchID,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
c.fetchHandles[key] = handle
|
||||
|
||||
return handle
|
||||
}
|
||||
|
||||
func (c *Cache) deleteFetchHandle(key string, fetchID uint64) {
|
||||
c.fetchLock.Lock()
|
||||
defer c.fetchLock.Unlock()
|
||||
|
||||
// Only remove a fetchHandle if it's YOUR fetchHandle.
|
||||
handle, ok := c.fetchHandles[key]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if handle.id == fetchID {
|
||||
delete(c.fetchHandles, key)
|
||||
}
|
||||
}
|
||||
|
||||
func backOffWait(failures uint) time.Duration {
|
||||
if failures > CacheRefreshBackoffMin {
|
||||
shift := failures - CacheRefreshBackoffMin
|
||||
|
33
agent/cache/cache_test.go
vendored
33
agent/cache/cache_test.go
vendored
@ -18,6 +18,7 @@ import (
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/lib/ttlcache"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
)
|
||||
|
||||
// Test a basic Get with no indexes (and therefore no blocking queries).
|
||||
@ -1750,12 +1751,22 @@ func TestCache_RefreshLifeCycle(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, true, result)
|
||||
|
||||
waitUntilFetching := func(expectValue bool) {
|
||||
retry.Run(t, func(t *retry.R) {
|
||||
c.entriesLock.Lock()
|
||||
defer c.entriesLock.Unlock()
|
||||
entry, ok := c.entries[key]
|
||||
require.True(t, ok)
|
||||
if expectValue {
|
||||
require.True(t, entry.Fetching)
|
||||
} else {
|
||||
require.False(t, entry.Fetching)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// ensure that the entry is fetching again
|
||||
c.entriesLock.Lock()
|
||||
entry, ok := c.entries[key]
|
||||
require.True(t, ok)
|
||||
require.True(t, entry.Fetching)
|
||||
c.entriesLock.Unlock()
|
||||
waitUntilFetching(true)
|
||||
|
||||
requestChan := make(chan error)
|
||||
|
||||
@ -1789,11 +1800,7 @@ func TestCache_RefreshLifeCycle(t *testing.T) {
|
||||
}
|
||||
|
||||
// ensure that the entry is fetching again
|
||||
c.entriesLock.Lock()
|
||||
entry, ok = c.entries[key]
|
||||
require.True(t, ok)
|
||||
require.True(t, entry.Fetching)
|
||||
c.entriesLock.Unlock()
|
||||
waitUntilFetching(true)
|
||||
|
||||
// background a call that will wait for a newer version - will result in an acl not found error
|
||||
go getError(5)
|
||||
@ -1814,11 +1821,7 @@ func TestCache_RefreshLifeCycle(t *testing.T) {
|
||||
|
||||
// ensure that the ACL not found error killed off the background refresh
|
||||
// but didn't remove it from the cache
|
||||
c.entriesLock.Lock()
|
||||
entry, ok = c.entries[key]
|
||||
require.True(t, ok)
|
||||
require.False(t, entry.Fetching)
|
||||
c.entriesLock.Unlock()
|
||||
waitUntilFetching(false)
|
||||
}
|
||||
|
||||
type fakeType struct {
|
||||
|
Loading…
x
Reference in New Issue
Block a user