diff --git a/agent/cache/eviction.go b/agent/cache/eviction.go index b0de07f2c2..efd1025025 100644 --- a/agent/cache/eviction.go +++ b/agent/cache/eviction.go @@ -23,12 +23,6 @@ type expiryHeap struct { // NotifyCh is sent a value whenever the 0 index value of the heap // changes. This can be used to detect when the earliest value // changes. - // - // There is a single edge case where the heap will not automatically - // send a notification: if heap.Fix is called manually and the index - // changed is 0 and the change doesn't result in any moves (stays at index - // 0), then we won't detect the change. To work around this, please - // always call the expiryHeap.Fix method instead. NotifyCh chan struct{} } @@ -41,10 +35,22 @@ func newExpiryHeap() *expiryHeap { return h } +// Add an entry to the heap. +// // Must be synchronized by the caller. func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry { - entry := &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)} + entry := &cacheEntryExpiry{ + Key: key, + Expires: time.Now().Add(expiry), + // Set the initial heap index to the last index. If the entry is swapped it + // will have the correct index set, and if it remains at the end the last + // index will be correct. + HeapIndex: len(h.Entries), + } heap.Push(h, entry) + if entry.HeapIndex == 0 { + h.notify() + } return entry } @@ -59,7 +65,7 @@ func (h *expiryHeap) Update(idx int, expiry time.Duration) { // If the previous index and current index are both zero then Fix did not // swap the entry, and notify must be called here. - if idx == 0 && entry.HeapIndex == 0 { + if idx == 0 || entry.HeapIndex == 0 { h.notify() } } @@ -74,6 +80,10 @@ func (h *expiryHeap) Remove(idx int) { // the entry was expired while it was fetching. Setting HeapIndex to -1 // indicates that the entry is no longer in the heap, and must be re-added. entry.HeapIndex = -1 + + if idx == 0 { + h.notify() + } } func (h *expiryHeap) Len() int { return len(h.Entries) } @@ -82,13 +92,6 @@ func (h *expiryHeap) Swap(i, j int) { h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i] h.Entries[i].HeapIndex = i h.Entries[j].HeapIndex = j - - // If we're moving the 0 index, update the channel since we need - // to re-update the timer we're waiting on for the soonest expiring - // value. - if i == 0 || j == 0 { - h.notify() - } } func (h *expiryHeap) Less(i, j int) bool { @@ -100,21 +103,7 @@ func (h *expiryHeap) Less(i, j int) bool { // heap.Interface, this isn't expected to be called directly. func (h *expiryHeap) Push(x interface{}) { - entry := x.(*cacheEntryExpiry) - - // Set the initial heap index to the last index. If the entry is swapped it - // will have the correct set, and if it remains at the end the last index will - // be correct. - entry.HeapIndex = len(h.Entries) - - // For the first entry, we need to trigger a channel send because - // Swap won't be called; nothing to swap! We can call it right away - // because all heap operations are within a lock. - if len(h.Entries) == 0 { - h.notify() - } - - h.Entries = append(h.Entries, entry) + h.Entries = append(h.Entries, x.(*cacheEntryExpiry)) } // heap.Interface, this isn't expected to be called directly. @@ -126,17 +115,14 @@ func (h *expiryHeap) Pop() interface{} { return last } -// TODO: look at calls to notify. +// notify the timer that the head value has changed, so the expiry time has +// also likely changed. func (h *expiryHeap) notify() { + // Send to channel without blocking. Skips sending if there is already + // an item in the buffered channel. select { case h.NotifyCh <- struct{}{}: - // Good - default: - // If the send would've blocked, we just ignore it. The reason this - // is safe is because NotifyCh should always be a buffered channel. - // If this blocks, it means that there is a pending message anyways - // so the receiver will restart regardless. } }