diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 10792c5d54..361f29f4d3 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -15,7 +15,6 @@ package cache import ( - "container/heap" "context" "fmt" "io" @@ -166,16 +165,11 @@ func applyDefaultValuesOnOptions(options Options) Options { // Further settings can be tweaked on the returned value. func New(options Options) *Cache { options = applyDefaultValuesOnOptions(options) - // Initialize the heap. The buffer of 1 is really important because - // its possible for the expiry loop to trigger the heap to update - // itself and it'd block forever otherwise. - h := &expiryHeap{NotifyCh: make(chan struct{}, 1)} - heap.Init(h) ctx, cancel := context.WithCancel(context.Background()) c := &Cache{ types: make(map[string]typeEntry), entries: make(map[string]cacheEntry), - entriesExpiryHeap: h, + entriesExpiryHeap: newExpiryHeap(), stopCh: make(chan struct{}), options: options, rateLimitContext: ctx, @@ -745,47 +739,30 @@ func backOffWait(failures uint) time.Duration { // runExpiryLoop is a blocking function that watches the expiration // heap and invalidates entries that have expired. func (c *Cache) runExpiryLoop() { - var expiryTimer *time.Timer for { - // If we have a previous timer, stop it. - if expiryTimer != nil { - expiryTimer.Stop() - } - - // Get the entry expiring soonest - var entry *cacheEntryExpiry - var expiryCh <-chan time.Time c.entriesLock.RLock() - if len(c.entriesExpiryHeap.Entries) > 0 { - entry = c.entriesExpiryHeap.Entries[0] - expiryTimer = time.NewTimer(time.Until(entry.Expires)) - expiryCh = expiryTimer.C - } + timer := c.entriesExpiryHeap.Next() c.entriesLock.RUnlock() select { case <-c.stopCh: + timer.Stop() return case <-c.entriesExpiryHeap.NotifyCh: - // Entries changed, so the heap may have changed. Restart loop. + timer.Stop() + continue - case <-expiryCh: + case <-timer.Wait(): c.entriesLock.Lock() - // Perform cleanup operations on the entry's state, if applicable. - state := c.entries[entry.Key].State - if closer, ok := state.(io.Closer); ok { + entry := timer.Entry + if closer, ok := c.entries[entry.Key].State.(io.Closer); ok { closer.Close() } // Entry expired! Remove it. delete(c.entries, entry.Key) - heap.Remove(c.entriesExpiryHeap, entry.HeapIndex) - - // This is subtle but important: if we race and simultaneously - // evict and fetch a new value, then we set this to -1 to - // have it treated as a new value so that the TTL is extended. - entry.HeapIndex = -1 + c.entriesExpiryHeap.Remove(entry.HeapIndex) // Set some metrics metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1) diff --git a/agent/cache/eviction.go b/agent/cache/eviction.go index 953929b7eb..b0de07f2c2 100644 --- a/agent/cache/eviction.go +++ b/agent/cache/eviction.go @@ -12,11 +12,6 @@ type cacheEntryExpiry struct { HeapIndex int // Index in the heap } -// TODO: use or remove -func newCacheEntry(key string, expiry time.Duration) *cacheEntryExpiry { - return &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)} -} - // expiryHeap is a container/heap.Interface implementation that expires entries // in the cache when their expiration time is reached. // @@ -37,6 +32,16 @@ type expiryHeap struct { NotifyCh chan struct{} } +// Initialize the heap. The buffer of 1 is really important because +// its possible for the expiry loop to trigger the heap to update +// itself and it'd block forever otherwise. +func newExpiryHeap() *expiryHeap { + h := &expiryHeap{NotifyCh: make(chan struct{}, 1)} + heap.Init(h) + return h +} + +// Must be synchronized by the caller. func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry { entry := &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)} heap.Push(h, entry) @@ -59,6 +64,18 @@ func (h *expiryHeap) Update(idx int, expiry time.Duration) { } } +// Must be synchronized by the caller. +func (h *expiryHeap) Remove(idx int) { + entry := h.Entries[idx] + heap.Remove(h, idx) + + // A goroutine which is fetching a new value will have a reference to this + // entry. When it re-acquires the lock it needs to be informed that + // the entry was expired while it was fetching. Setting HeapIndex to -1 + // indicates that the entry is no longer in the heap, and must be re-added. + entry.HeapIndex = -1 +} + func (h *expiryHeap) Len() int { return len(h.Entries) } func (h *expiryHeap) Swap(i, j int) { @@ -109,6 +126,7 @@ func (h *expiryHeap) Pop() interface{} { return last } +// TODO: look at calls to notify. func (h *expiryHeap) notify() { select { case h.NotifyCh <- struct{}{}: @@ -121,3 +139,33 @@ func (h *expiryHeap) notify() { // so the receiver will restart regardless. } } + +// Must be synchronized by the caller. +func (h *expiryHeap) Next() timer { + if len(h.Entries) == 0 { + return timer{} + } + entry := h.Entries[0] + return timer{ + timer: time.NewTimer(time.Until(entry.Expires)), + Entry: entry, + } +} + +type timer struct { + timer *time.Timer + Entry *cacheEntryExpiry +} + +func (t *timer) Wait() <-chan time.Time { + if t.timer == nil { + return nil + } + return t.timer.C +} + +func (t *timer) Stop() { + if t.timer != nil { + t.timer.Stop() + } +} diff --git a/agent/cache/eviction_test.go b/agent/cache/eviction_test.go index edb4564cf4..0b200c2dd6 100644 --- a/agent/cache/eviction_test.go +++ b/agent/cache/eviction_test.go @@ -43,12 +43,10 @@ func TestExpiryHeap(t *testing.T) { }) runStep(t, "remove the first entry", func(t *testing.T) { - remove := h.Entries[0] - heap.Remove(h, remove.HeapIndex) + h.Remove(0) require.Equal(0, entry.HeapIndex) require.Equal(1, entry3.HeapIndex) testMessage(t, ch) - testMessage(t, ch) // we have two because two swaps happen testNoMessage(t, ch) }) @@ -94,3 +92,27 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) { t.FailNow() } } + +func TestExpiryLoop_ExitsWhenStopped(t *testing.T) { + c := &Cache{ + stopCh: make(chan struct{}), + entries: make(map[string]cacheEntry), + entriesExpiryHeap: newExpiryHeap(), + } + chStart := make(chan struct{}) + chDone := make(chan struct{}) + go func() { + close(chStart) + c.runExpiryLoop() + close(chDone) + }() + + <-chStart + close(c.stopCh) + + select { + case <-chDone: + case <-time.After(50 * time.Millisecond): + t.Fatalf("expected loop to exit when stopped") + } +}