Add a Close method to cache that stops background goroutines. (#4746)

In a real agent the `cache` instance is alive until the agent shuts down so this is not a real leak in production, however in out test suite, every testAgent that is started and stops leaks goroutines that never get cleaned up which accumulate consuming CPU and memory through subsequent test in the `agent` package which doesn't help our test flakiness.

This adds a Close method that doesn't invalidate or clean up the cache, and still allows concurrent blocking queries to run (for up to 10 mins which might still affect tests). But at least it doesn't maintain them forever with background refresh and an expiry watcher routine.

It would be nice to cancel any outstanding blocking requests as well when we close but that requires much more invasive surgery right into our RPC protocol since we don't have a way to cancel requests currently.

Unscientifically this seems to make tests pass a bit quicker and more reliably locally but I can't really be sure of that!
This commit is contained in:
Paul Banks 2018-10-04 11:27:11 +01:00 committed by GitHub
parent 27729208e6
commit e8ba527f23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 0 deletions

View File

@ -1354,6 +1354,11 @@ func (a *Agent) ShutdownAgent() error {
}
}
// Stop the cache background work
if a.cache != nil {
a.cache.Close()
}
var err error
if a.delegate != nil {
err = a.delegate.Shutdown()

29
agent/cache/cache.go vendored
View File

@ -18,6 +18,7 @@ import (
"container/heap"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
@ -71,6 +72,12 @@ type Cache struct {
entriesLock sync.RWMutex
entries map[string]cacheEntry
entriesExpiryHeap *expiryHeap
// stopped is used as an atomic flag to signal that the Cache has been
// discarded so background fetches and expiry processing should stop.
stopped uint32
// stopCh is closed when Close is called
stopCh chan struct{}
}
// typeEntry is a single type that is registered with a Cache.
@ -104,6 +111,7 @@ func New(*Options) *Cache {
types: make(map[string]typeEntry),
entries: make(map[string]cacheEntry),
entriesExpiryHeap: h,
stopCh: make(chan struct{}),
}
// Start the expiry watcher
@ -467,6 +475,10 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin
if !opts.Refresh {
return
}
// Check if cache was stopped
if atomic.LoadUint32(&c.stopped) == 1 {
return
}
// If we're over the attempt minimum, start an exponential backoff.
if attempt > CacheRefreshBackoffMin {
@ -511,6 +523,8 @@ func (c *Cache) runExpiryLoop() {
c.entriesLock.RUnlock()
select {
case <-c.stopCh:
return
case <-c.entriesExpiryHeap.NotifyCh:
// Entries changed, so the heap may have changed. Restart loop.
@ -534,3 +548,18 @@ func (c *Cache) runExpiryLoop() {
}
}
}
// Close stops any background work and frees all resources for the cache.
// Current Fetch requests are allowed to continue to completion and callers may
// still access the current cache values so coordination isn't needed with
// callers, however no background activity will continue. It's intended to close
// the cache at agent shutdown so no further requests should be made, however
// concurrent or in-flight ones won't break.
func (c *Cache) Close() error {
wasStopped := atomic.SwapUint32(&c.stopped, 1)
if wasStopped == 0 {
// First time only, close stop chan
close(c.stopCh)
}
return nil
}