diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 03b9df8b9b..c7b7bcfc1c 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -26,6 +26,8 @@ import ( "github.com/armon/go-metrics" "golang.org/x/time/rate" + "github.com/hashicorp/consul/lib/ttlcache" + "github.com/hashicorp/consul/lib" ) @@ -87,7 +89,7 @@ type Cache struct { // internal storage format so changing this should be possible safely. entriesLock sync.RWMutex entries map[string]cacheEntry - entriesExpiryHeap *ExpiryHeap + entriesExpiryHeap *ttlcache.ExpiryHeap // stopped is used as an atomic flag to signal that the Cache has been // discarded so background fetches and expiry processing should stop. @@ -169,7 +171,7 @@ func New(options Options) *Cache { c := &Cache{ types: make(map[string]typeEntry), entries: make(map[string]cacheEntry), - entriesExpiryHeap: NewExpiryHeap(), + entriesExpiryHeap: ttlcache.NewExpiryHeap(), stopCh: make(chan struct{}), options: options, rateLimitContext: ctx, @@ -400,7 +402,7 @@ RETRY_GET: // Touch the expiration and fix the heap. c.entriesLock.Lock() - c.entriesExpiryHeap.Update(entry.Expiry.HeapIndex, r.TypeEntry.Opts.LastGetTTL) + c.entriesExpiryHeap.Update(entry.Expiry.Index(), r.TypeEntry.Opts.LastGetTTL) c.entriesLock.Unlock() // We purposely do not return an error here since the cache only works with @@ -681,7 +683,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // If this is a new entry (not in the heap yet), then setup the // initial expiry information and insert. If we're already in // the heap we do nothing since we're reusing the same entry. - if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 { + if newEntry.Expiry == nil || newEntry.Expiry.Index() == -1 { newEntry.Expiry = c.entriesExpiryHeap.Add(key, tEntry.Opts.LastGetTTL) } @@ -762,7 +764,7 @@ func (c *Cache) runExpiryLoop() { // Entry expired! Remove it. delete(c.entries, entry.Key) - c.entriesExpiryHeap.Remove(entry.HeapIndex) + c.entriesExpiryHeap.Remove(entry.Index()) // Set some metrics metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1) @@ -803,7 +805,6 @@ func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) erro Index: res.Index, FetchedAt: time.Now(), Waiter: make(chan struct{}), - Expiry: &CacheEntryExpiry{Key: key}, FetchRateLimiter: rate.NewLimiter( c.options.EntryFetchRate, c.options.EntryFetchMaxBurst, diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 780137e18e..8988b3bf61 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/time/rate" + "github.com/hashicorp/consul/lib/ttlcache" "github.com/hashicorp/consul/sdk/testutil" ) @@ -1405,3 +1406,27 @@ OUT: } } } + +func TestCache_ExpiryLoop_ExitsWhenStopped(t *testing.T) { + c := &Cache{ + stopCh: make(chan struct{}), + entries: make(map[string]cacheEntry), + entriesExpiryHeap: ttlcache.NewExpiryHeap(), + } + chStart := make(chan struct{}) + chDone := make(chan struct{}) + go func() { + close(chStart) + c.runExpiryLoop() + close(chDone) + }() + + <-chStart + close(c.stopCh) + + select { + case <-chDone: + case <-time.After(50 * time.Millisecond): + t.Fatalf("expected loop to exit when stopped") + } +} diff --git a/agent/cache/entry.go b/agent/cache/entry.go index fca1f20d1b..0c71e94437 100644 --- a/agent/cache/entry.go +++ b/agent/cache/entry.go @@ -4,6 +4,8 @@ import ( "time" "golang.org/x/time/rate" + + "github.com/hashicorp/consul/lib/ttlcache" ) // cacheEntry stores a single cache entry. @@ -31,7 +33,7 @@ type cacheEntry struct { // Expiry contains information about the expiration of this // entry. This is a pointer as its shared as a value in the // ExpiryHeap as well. - Expiry *CacheEntryExpiry + Expiry *ttlcache.Entry // FetchedAt stores the time the cache entry was retrieved for determining // it's age later. diff --git a/agent/cache/eviction.go b/lib/ttlcache/eviction.go similarity index 74% rename from agent/cache/eviction.go rename to lib/ttlcache/eviction.go index 579f13fefe..c04855a9bf 100644 --- a/agent/cache/eviction.go +++ b/lib/ttlcache/eviction.go @@ -1,15 +1,24 @@ -package cache +package ttlcache import ( "container/heap" "time" ) -// CacheEntryExpiry contains the expiration time for a cache entry. -type CacheEntryExpiry struct { - Key string // Key in the cache map - Expires time.Time // Time when entry expires (monotonic clock) - HeapIndex int // Index in the heap +// Entry in the ExpiryHeap, tracks the index and expiry time of an item in a +// ttl cache. +type Entry struct { + // TODO: can Key be unexported? + Key string + expiry time.Time + heapIndex int +} + +func (c *Entry) Index() int { + if c == nil { + return -1 + } + return c.heapIndex } // ExpiryHeap is a container/heap.Interface implementation that expires entries @@ -18,7 +27,7 @@ type CacheEntryExpiry struct { // All operations on the heap and read/write of the heap contents require // the proper entriesLock to be held on Cache. type ExpiryHeap struct { - entries []*CacheEntryExpiry + entries []*Entry // NotifyCh is sent a value whenever the 0 index value of the heap // changes. This can be used to detect when the earliest value @@ -26,9 +35,7 @@ type ExpiryHeap struct { NotifyCh chan struct{} } -// Initialize the heap. The buffer of 1 is really important because -// its possible for the expiry loop to trigger the heap to update -// itself and it'd block forever otherwise. +// NewExpiryHeap creates and returns a new ExpiryHeap. func NewExpiryHeap() *ExpiryHeap { h := &ExpiryHeap{NotifyCh: make(chan struct{}, 1)} heap.Init((*entryHeap)(h)) @@ -38,17 +45,17 @@ func NewExpiryHeap() *ExpiryHeap { // Add an entry to the heap. // // Must be synchronized by the caller. -func (h *ExpiryHeap) Add(key string, expiry time.Duration) *CacheEntryExpiry { - entry := &CacheEntryExpiry{ - Key: key, - Expires: time.Now().Add(expiry), +func (h *ExpiryHeap) Add(key string, expiry time.Duration) *Entry { + entry := &Entry{ + Key: key, + expiry: time.Now().Add(expiry), // Set the initial heap index to the last index. If the entry is swapped it // will have the correct index set, and if it remains at the end the last // index will be correct. - HeapIndex: len(h.entries), + heapIndex: len(h.entries), } heap.Push((*entryHeap)(h), entry) - if entry.HeapIndex == 0 { + if entry.heapIndex == 0 { h.notify() } return entry @@ -60,16 +67,18 @@ func (h *ExpiryHeap) Add(key string, expiry time.Duration) *CacheEntryExpiry { // Must be synchronized by the caller. func (h *ExpiryHeap) Update(idx int, expiry time.Duration) { entry := h.entries[idx] - entry.Expires = time.Now().Add(expiry) + entry.expiry = time.Now().Add(expiry) heap.Fix((*entryHeap)(h), idx) // If the previous index and current index are both zero then Fix did not // swap the entry, and notify must be called here. - if idx == 0 || entry.HeapIndex == 0 { + if idx == 0 || entry.heapIndex == 0 { h.notify() } } +// Remove the entry at idx from the heap. +// // Must be synchronized by the caller. func (h *ExpiryHeap) Remove(idx int) { entry := h.entries[idx] @@ -77,9 +86,9 @@ func (h *ExpiryHeap) Remove(idx int) { // A goroutine which is fetching a new value will have a reference to this // entry. When it re-acquires the lock it needs to be informed that - // the entry was expired while it was fetching. Setting HeapIndex to -1 + // the entry was expired while it was fetching. Setting heapIndex to -1 // indicates that the entry is no longer in the heap, and must be re-added. - entry.HeapIndex = -1 + entry.heapIndex = -1 if idx == 0 { h.notify() @@ -92,20 +101,20 @@ func (h *entryHeap) Len() int { return len(h.entries) } func (h *entryHeap) Swap(i, j int) { h.entries[i], h.entries[j] = h.entries[j], h.entries[i] - h.entries[i].HeapIndex = i - h.entries[j].HeapIndex = j + h.entries[i].heapIndex = i + h.entries[j].heapIndex = j } func (h *entryHeap) Less(i, j int) bool { // The usage of Before here is important (despite being obvious): // this function uses the monotonic time that should be available // on the time.Time value so the heap is immune to wall clock changes. - return h.entries[i].Expires.Before(h.entries[j].Expires) + return h.entries[i].expiry.Before(h.entries[j].expiry) } // heap.Interface, this isn't expected to be called directly. func (h *entryHeap) Push(x interface{}) { - h.entries = append(h.entries, x.(*CacheEntryExpiry)) + h.entries = append(h.entries, x.(*Entry)) } // heap.Interface, this isn't expected to be called directly. @@ -128,6 +137,8 @@ func (h *ExpiryHeap) notify() { } } +// Next returns a Timer that waits until the first entry in the heap expires. +// // Must be synchronized by the caller. func (h *ExpiryHeap) Next() Timer { if len(h.entries) == 0 { @@ -135,14 +146,14 @@ func (h *ExpiryHeap) Next() Timer { } entry := h.entries[0] return Timer{ - timer: time.NewTimer(time.Until(entry.Expires)), + timer: time.NewTimer(time.Until(entry.expiry)), Entry: entry, } } type Timer struct { timer *time.Timer - Entry *CacheEntryExpiry + Entry *Entry } func (t *Timer) Wait() <-chan time.Time { diff --git a/agent/cache/eviction_test.go b/lib/ttlcache/eviction_test.go similarity index 60% rename from agent/cache/eviction_test.go rename to lib/ttlcache/eviction_test.go index 1612b29472..1fdd8e485c 100644 --- a/agent/cache/eviction_test.go +++ b/lib/ttlcache/eviction_test.go @@ -1,4 +1,4 @@ -package cache +package ttlcache import ( "container/heap" @@ -13,52 +13,52 @@ var _ heap.Interface = (*entryHeap)(nil) func TestExpiryHeap(t *testing.T) { h := NewExpiryHeap() ch := h.NotifyCh - var entry, entry2, entry3 *CacheEntryExpiry + var entry, entry2, entry3 *Entry // Init, shouldn't trigger anything testNoMessage(t, ch) runStep(t, "add an entry", func(t *testing.T) { entry = h.Add("foo", 100*time.Millisecond) - assert.Equal(t, 0, entry.HeapIndex) + assert.Equal(t, 0, entry.heapIndex) testMessage(t, ch) testNoMessage(t, ch) // exactly one asserted above }) runStep(t, "add a second entry in front", func(t *testing.T) { entry2 = h.Add("bar", 50*time.Millisecond) - assert.Equal(t, 0, entry2.HeapIndex) - assert.Equal(t, 1, entry.HeapIndex) + assert.Equal(t, 0, entry2.heapIndex) + assert.Equal(t, 1, entry.heapIndex) testMessage(t, ch) testNoMessage(t, ch) // exactly one asserted above }) runStep(t, "add a third entry at the end", func(t *testing.T) { entry3 = h.Add("baz", 1000*time.Millisecond) - assert.Equal(t, 2, entry3.HeapIndex) + assert.Equal(t, 2, entry3.heapIndex) testNoMessage(t, ch) // no notify cause index 0 stayed the same }) runStep(t, "remove the first entry", func(t *testing.T) { h.Remove(0) - assert.Equal(t, 0, entry.HeapIndex) - assert.Equal(t, 1, entry3.HeapIndex) + assert.Equal(t, 0, entry.heapIndex) + assert.Equal(t, 1, entry3.heapIndex) testMessage(t, ch) testNoMessage(t, ch) }) runStep(t, "update entry3 to expire first", func(t *testing.T) { - h.Update(entry3.HeapIndex, 10*time.Millisecond) - assert.Equal(t, 1, entry.HeapIndex) - assert.Equal(t, 0, entry3.HeapIndex) + h.Update(entry3.heapIndex, 10*time.Millisecond) + assert.Equal(t, 1, entry.heapIndex) + assert.Equal(t, 0, entry3.heapIndex) testMessage(t, ch) testNoMessage(t, ch) }) runStep(t, "0th element change triggers a notify", func(t *testing.T) { - h.Update(entry3.HeapIndex, 20) - assert.Equal(t, 1, entry.HeapIndex) // no move - assert.Equal(t, 0, entry3.HeapIndex) + h.Update(entry3.heapIndex, 20) + assert.Equal(t, 1, entry.heapIndex) // no move + assert.Equal(t, 0, entry3.heapIndex) testMessage(t, ch) testNoMessage(t, ch) // one message }) @@ -89,27 +89,3 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) { t.FailNow() } } - -func TestExpiryLoop_ExitsWhenStopped(t *testing.T) { - c := &Cache{ - stopCh: make(chan struct{}), - entries: make(map[string]cacheEntry), - entriesExpiryHeap: NewExpiryHeap(), - } - chStart := make(chan struct{}) - chDone := make(chan struct{}) - go func() { - close(chStart) - c.runExpiryLoop() - close(chDone) - }() - - <-chStart - close(c.stopCh) - - select { - case <-chDone: - case <-time.After(50 * time.Millisecond): - t.Fatalf("expected loop to exit when stopped") - } -}