diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 0852393af4..10792c5d54 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -406,8 +406,7 @@ RETRY_GET: // Touch the expiration and fix the heap. c.entriesLock.Lock() - entry.Expiry.Update(r.TypeEntry.Opts.LastGetTTL) - c.entriesExpiryHeap.Fix(entry.Expiry) + c.entriesExpiryHeap.Update(entry.Expiry.HeapIndex, r.TypeEntry.Opts.LastGetTTL) c.entriesLock.Unlock() // We purposely do not return an error here since the cache only works with @@ -689,9 +688,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // initial expiry information and insert. If we're already in // the heap we do nothing since we're reusing the same entry. if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 { - newEntry.Expiry = &cacheEntryExpiry{Key: key} - newEntry.Expiry.Update(tEntry.Opts.LastGetTTL) - heap.Push(c.entriesExpiryHeap, newEntry.Expiry) + newEntry.Expiry = c.entriesExpiryHeap.Add(key, tEntry.Opts.LastGetTTL) } c.entries[key] = newEntry diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 8832572c31..780137e18e 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -1000,6 +1000,9 @@ func (t *testPartitionType) RegisterOptions() RegisterOptions { // Test that background refreshing reports correct Age in failure and happy // states. func TestCacheGet_refreshAge(t *testing.T) { + if testing.Short() { + t.Skip("too slow for -short run") + } t.Parallel() require := require.New(t) diff --git a/agent/cache/entry.go b/agent/cache/entry.go index 440c654ba8..9d7c343d70 100644 --- a/agent/cache/entry.go +++ b/agent/cache/entry.go @@ -1,7 +1,6 @@ package cache import ( - "container/heap" "time" "golang.org/x/time/rate" @@ -46,118 +45,3 @@ type cacheEntry struct { // FetchRateLimiter limits the rate at which fetch is called for this entry. FetchRateLimiter *rate.Limiter } - -// cacheEntryExpiry contains the expiration information for a cache -// entry. Any modifications to this struct should be done only while -// the Cache entriesLock is held. -type cacheEntryExpiry struct { - Key string // Key in the cache map - Expires time.Time // Time when entry expires (monotonic clock) - HeapIndex int // Index in the heap -} - -// Update the expiry to d time from now. -func (e *cacheEntryExpiry) Update(d time.Duration) { - e.Expires = time.Now().Add(d) -} - -// expiryHeap is a heap implementation that stores information about -// when entries expire. Implements container/heap.Interface. -// -// All operations on the heap and read/write of the heap contents require -// the proper entriesLock to be held on Cache. -type expiryHeap struct { - Entries []*cacheEntryExpiry - - // NotifyCh is sent a value whenever the 0 index value of the heap - // changes. This can be used to detect when the earliest value - // changes. - // - // There is a single edge case where the heap will not automatically - // send a notification: if heap.Fix is called manually and the index - // changed is 0 and the change doesn't result in any moves (stays at index - // 0), then we won't detect the change. To work around this, please - // always call the expiryHeap.Fix method instead. - NotifyCh chan struct{} -} - -// Identical to heap.Fix for this heap instance but will properly handle -// the edge case where idx == 0 and no heap modification is necessary, -// and still notify the NotifyCh. -// -// This is important for cache expiry since the expiry time may have been -// extended and if we don't send a message to the NotifyCh then we'll never -// reset the timer and the entry will be evicted early. -func (h *expiryHeap) Fix(entry *cacheEntryExpiry) { - idx := entry.HeapIndex - heap.Fix(h, idx) - - // This is the edge case we handle: if the prev (idx) and current (HeapIndex) - // is zero, it means the head-of-line didn't change while the value - // changed. Notify to reset our expiry worker. - if idx == 0 && entry.HeapIndex == 0 { - h.notify() - } -} - -func (h *expiryHeap) Len() int { return len(h.Entries) } - -func (h *expiryHeap) Swap(i, j int) { - h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i] - h.Entries[i].HeapIndex = i - h.Entries[j].HeapIndex = j - - // If we're moving the 0 index, update the channel since we need - // to re-update the timer we're waiting on for the soonest expiring - // value. - if i == 0 || j == 0 { - h.notify() - } -} - -func (h *expiryHeap) Less(i, j int) bool { - // The usage of Before here is important (despite being obvious): - // this function uses the monotonic time that should be available - // on the time.Time value so the heap is immune to wall clock changes. - return h.Entries[i].Expires.Before(h.Entries[j].Expires) -} - -// heap.Interface, this isn't expected to be called directly. -func (h *expiryHeap) Push(x interface{}) { - entry := x.(*cacheEntryExpiry) - - // Set initial heap index, if we're going to the end then Swap - // won't be called so we need to initialize - entry.HeapIndex = len(h.Entries) - - // For the first entry, we need to trigger a channel send because - // Swap won't be called; nothing to swap! We can call it right away - // because all heap operations are within a lock. - if len(h.Entries) == 0 { - h.notify() - } - - h.Entries = append(h.Entries, entry) -} - -// heap.Interface, this isn't expected to be called directly. -func (h *expiryHeap) Pop() interface{} { - old := h.Entries - n := len(old) - x := old[n-1] - h.Entries = old[0 : n-1] - return x -} - -func (h *expiryHeap) notify() { - select { - case h.NotifyCh <- struct{}{}: - // Good - - default: - // If the send would've blocked, we just ignore it. The reason this - // is safe is because NotifyCh should always be a buffered channel. - // If this blocks, it means that there is a pending message anyways - // so the receiver will restart regardless. - } -} diff --git a/agent/cache/entry_test.go b/agent/cache/entry_test.go deleted file mode 100644 index fe40733630..0000000000 --- a/agent/cache/entry_test.go +++ /dev/null @@ -1,91 +0,0 @@ -package cache - -import ( - "container/heap" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestExpiryHeap_impl(t *testing.T) { - var _ heap.Interface = new(expiryHeap) -} - -func TestExpiryHeap(t *testing.T) { - require := require.New(t) - now := time.Now() - ch := make(chan struct{}, 10) // buffered to prevent blocking in tests - h := &expiryHeap{NotifyCh: ch} - - // Init, shouldn't trigger anything - heap.Init(h) - testNoMessage(t, ch) - - // Push an initial value, expect one message - entry := &cacheEntryExpiry{Key: "foo", HeapIndex: -1, Expires: now.Add(100)} - heap.Push(h, entry) - require.Equal(0, entry.HeapIndex) - testMessage(t, ch) - testNoMessage(t, ch) // exactly one asserted above - - // Push another that goes earlier than entry - entry2 := &cacheEntryExpiry{Key: "bar", HeapIndex: -1, Expires: now.Add(50)} - heap.Push(h, entry2) - require.Equal(0, entry2.HeapIndex) - require.Equal(1, entry.HeapIndex) - testMessage(t, ch) - testNoMessage(t, ch) // exactly one asserted above - - // Push another that goes at the end - entry3 := &cacheEntryExpiry{Key: "bar", HeapIndex: -1, Expires: now.Add(1000)} - heap.Push(h, entry3) - require.Equal(2, entry3.HeapIndex) - testNoMessage(t, ch) // no notify cause index 0 stayed the same - - // Remove the first entry (not Pop, since we don't use Pop, but that works too) - remove := h.Entries[0] - heap.Remove(h, remove.HeapIndex) - require.Equal(0, entry.HeapIndex) - require.Equal(1, entry3.HeapIndex) - testMessage(t, ch) - testMessage(t, ch) // we have two because two swaps happen - testNoMessage(t, ch) - - // Let's change entry 3 to be early, and fix it - entry3.Expires = now.Add(10) - h.Fix(entry3) - require.Equal(1, entry.HeapIndex) - require.Equal(0, entry3.HeapIndex) - testMessage(t, ch) - testNoMessage(t, ch) - - // Let's change entry 3 again, this is an edge case where if the 0th - // element changed, we didn't trigger the channel. Our Fix func should. - entry.Expires = now.Add(20) - h.Fix(entry3) - require.Equal(1, entry.HeapIndex) // no move - require.Equal(0, entry3.HeapIndex) - testMessage(t, ch) - testNoMessage(t, ch) // one message -} - -func testNoMessage(t *testing.T, ch <-chan struct{}) { - t.Helper() - - select { - case <-ch: - t.Fatal("should not have a message") - default: - } -} - -func testMessage(t *testing.T, ch <-chan struct{}) { - t.Helper() - - select { - case <-ch: - default: - t.Fatal("should have a message") - } -} diff --git a/agent/cache/eviction.go b/agent/cache/eviction.go new file mode 100644 index 0000000000..953929b7eb --- /dev/null +++ b/agent/cache/eviction.go @@ -0,0 +1,123 @@ +package cache + +import ( + "container/heap" + "time" +) + +// cacheEntryExpiry contains the expiration time for a cache entry. +type cacheEntryExpiry struct { + Key string // Key in the cache map + Expires time.Time // Time when entry expires (monotonic clock) + HeapIndex int // Index in the heap +} + +// TODO: use or remove +func newCacheEntry(key string, expiry time.Duration) *cacheEntryExpiry { + return &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)} +} + +// expiryHeap is a container/heap.Interface implementation that expires entries +// in the cache when their expiration time is reached. +// +// All operations on the heap and read/write of the heap contents require +// the proper entriesLock to be held on Cache. +type expiryHeap struct { + Entries []*cacheEntryExpiry + + // NotifyCh is sent a value whenever the 0 index value of the heap + // changes. This can be used to detect when the earliest value + // changes. + // + // There is a single edge case where the heap will not automatically + // send a notification: if heap.Fix is called manually and the index + // changed is 0 and the change doesn't result in any moves (stays at index + // 0), then we won't detect the change. To work around this, please + // always call the expiryHeap.Fix method instead. + NotifyCh chan struct{} +} + +func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry { + entry := &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)} + heap.Push(h, entry) + return entry +} + +// Update the entry that is currently at idx with the new expiry time. The heap +// will be rebalanced after the entry is updated. +// +// Must be synchronized by the caller. +func (h *expiryHeap) Update(idx int, expiry time.Duration) { + entry := h.Entries[idx] + entry.Expires = time.Now().Add(expiry) + heap.Fix(h, idx) + + // If the previous index and current index are both zero then Fix did not + // swap the entry, and notify must be called here. + if idx == 0 && entry.HeapIndex == 0 { + h.notify() + } +} + +func (h *expiryHeap) Len() int { return len(h.Entries) } + +func (h *expiryHeap) Swap(i, j int) { + h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i] + h.Entries[i].HeapIndex = i + h.Entries[j].HeapIndex = j + + // If we're moving the 0 index, update the channel since we need + // to re-update the timer we're waiting on for the soonest expiring + // value. + if i == 0 || j == 0 { + h.notify() + } +} + +func (h *expiryHeap) Less(i, j int) bool { + // The usage of Before here is important (despite being obvious): + // this function uses the monotonic time that should be available + // on the time.Time value so the heap is immune to wall clock changes. + return h.Entries[i].Expires.Before(h.Entries[j].Expires) +} + +// heap.Interface, this isn't expected to be called directly. +func (h *expiryHeap) Push(x interface{}) { + entry := x.(*cacheEntryExpiry) + + // Set the initial heap index to the last index. If the entry is swapped it + // will have the correct set, and if it remains at the end the last index will + // be correct. + entry.HeapIndex = len(h.Entries) + + // For the first entry, we need to trigger a channel send because + // Swap won't be called; nothing to swap! We can call it right away + // because all heap operations are within a lock. + if len(h.Entries) == 0 { + h.notify() + } + + h.Entries = append(h.Entries, entry) +} + +// heap.Interface, this isn't expected to be called directly. +func (h *expiryHeap) Pop() interface{} { + n := len(h.Entries) + entries := h.Entries + last := entries[n-1] + h.Entries = entries[0 : n-1] + return last +} + +func (h *expiryHeap) notify() { + select { + case h.NotifyCh <- struct{}{}: + // Good + + default: + // If the send would've blocked, we just ignore it. The reason this + // is safe is because NotifyCh should always be a buffered channel. + // If this blocks, it means that there is a pending message anyways + // so the receiver will restart regardless. + } +} diff --git a/agent/cache/eviction_test.go b/agent/cache/eviction_test.go new file mode 100644 index 0000000000..edb4564cf4 --- /dev/null +++ b/agent/cache/eviction_test.go @@ -0,0 +1,96 @@ +package cache + +import ( + "container/heap" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var _ heap.Interface = new(expiryHeap) + +func TestExpiryHeap(t *testing.T) { + require := require.New(t) + ch := make(chan struct{}, 10) // buffered to prevent blocking in tests + h := &expiryHeap{NotifyCh: ch} + var entry, entry2, entry3 *cacheEntryExpiry + + // Init, shouldn't trigger anything + heap.Init(h) + testNoMessage(t, ch) + + runStep(t, "add an entry", func(t *testing.T) { + entry = h.Add("foo", 100*time.Millisecond) + require.Equal(0, entry.HeapIndex) + testMessage(t, ch) + testNoMessage(t, ch) // exactly one asserted above + }) + + runStep(t, "add a second entry in front", func(t *testing.T) { + entry2 = h.Add("bar", 50*time.Millisecond) + require.Equal(0, entry2.HeapIndex) + require.Equal(1, entry.HeapIndex) + testMessage(t, ch) + testNoMessage(t, ch) // exactly one asserted above + }) + + runStep(t, "add a third entry at the end", func(t *testing.T) { + entry3 = h.Add("baz", 1000*time.Millisecond) + require.Equal(2, entry3.HeapIndex) + testNoMessage(t, ch) // no notify cause index 0 stayed the same + }) + + runStep(t, "remove the first entry", func(t *testing.T) { + remove := h.Entries[0] + heap.Remove(h, remove.HeapIndex) + require.Equal(0, entry.HeapIndex) + require.Equal(1, entry3.HeapIndex) + testMessage(t, ch) + testMessage(t, ch) // we have two because two swaps happen + testNoMessage(t, ch) + }) + + runStep(t, "update entry3 to expire first", func(t *testing.T) { + h.Update(entry3.HeapIndex, 10*time.Millisecond) + assert.Equal(t, 1, entry.HeapIndex) + assert.Equal(t, 0, entry3.HeapIndex) + testMessage(t, ch) + testNoMessage(t, ch) + }) + + runStep(t, "0th element change triggers a notify", func(t *testing.T) { + h.Update(entry3.HeapIndex, 20) + require.Equal(1, entry.HeapIndex) // no move + require.Equal(0, entry3.HeapIndex) + testMessage(t, ch) + testNoMessage(t, ch) // one message + }) +} + +func testNoMessage(t *testing.T, ch <-chan struct{}) { + t.Helper() + + select { + case <-ch: + t.Fatal("should not have a message") + default: + } +} + +func testMessage(t *testing.T, ch <-chan struct{}) { + t.Helper() + + select { + case <-ch: + default: + t.Fatal("should have a message") + } +} + +func runStep(t *testing.T, name string, fn func(t *testing.T)) { + if !t.Run(name, fn) { + t.FailNow() + } +}