From e8ba527f23f18b008a28e1fb2aed0def3c7d7c9c Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Thu, 4 Oct 2018 11:27:11 +0100 Subject: [PATCH] Add a Close method to cache that stops background goroutines. (#4746) In a real agent the `cache` instance is alive until the agent shuts down so this is not a real leak in production, however in out test suite, every testAgent that is started and stops leaks goroutines that never get cleaned up which accumulate consuming CPU and memory through subsequent test in the `agent` package which doesn't help our test flakiness. This adds a Close method that doesn't invalidate or clean up the cache, and still allows concurrent blocking queries to run (for up to 10 mins which might still affect tests). But at least it doesn't maintain them forever with background refresh and an expiry watcher routine. It would be nice to cancel any outstanding blocking requests as well when we close but that requires much more invasive surgery right into our RPC protocol since we don't have a way to cancel requests currently. Unscientifically this seems to make tests pass a bit quicker and more reliably locally but I can't really be sure of that! --- agent/agent.go | 5 +++++ agent/cache/cache.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/agent/agent.go b/agent/agent.go index 4fc0c02798..07776601d5 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1354,6 +1354,11 @@ func (a *Agent) ShutdownAgent() error { } } + // Stop the cache background work + if a.cache != nil { + a.cache.Close() + } + var err error if a.delegate != nil { err = a.delegate.Shutdown() diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 76494168f5..df3db58d0f 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -18,6 +18,7 @@ import ( "container/heap" "fmt" "sync" + "sync/atomic" "time" "github.com/armon/go-metrics" @@ -71,6 +72,12 @@ type Cache struct { entriesLock sync.RWMutex entries map[string]cacheEntry entriesExpiryHeap *expiryHeap + + // stopped is used as an atomic flag to signal that the Cache has been + // discarded so background fetches and expiry processing should stop. + stopped uint32 + // stopCh is closed when Close is called + stopCh chan struct{} } // typeEntry is a single type that is registered with a Cache. @@ -104,6 +111,7 @@ func New(*Options) *Cache { types: make(map[string]typeEntry), entries: make(map[string]cacheEntry), entriesExpiryHeap: h, + stopCh: make(chan struct{}), } // Start the expiry watcher @@ -467,6 +475,10 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin if !opts.Refresh { return } + // Check if cache was stopped + if atomic.LoadUint32(&c.stopped) == 1 { + return + } // If we're over the attempt minimum, start an exponential backoff. if attempt > CacheRefreshBackoffMin { @@ -511,6 +523,8 @@ func (c *Cache) runExpiryLoop() { c.entriesLock.RUnlock() select { + case <-c.stopCh: + return case <-c.entriesExpiryHeap.NotifyCh: // Entries changed, so the heap may have changed. Restart loop. @@ -534,3 +548,18 @@ func (c *Cache) runExpiryLoop() { } } } + +// Close stops any background work and frees all resources for the cache. +// Current Fetch requests are allowed to continue to completion and callers may +// still access the current cache values so coordination isn't needed with +// callers, however no background activity will continue. It's intended to close +// the cache at agent shutdown so no further requests should be made, however +// concurrent or in-flight ones won't break. +func (c *Cache) Close() error { + wasStopped := atomic.SwapUint32(&c.stopped, 1) + if wasStopped == 0 { + // First time only, close stop chan + close(c.stopCh) + } + return nil +}