cache: Refactor heap.notify to make it more explicit.

And remove duplicate notifications.

Instead of performing the check in the heap implementation, check the
index in the higher level interface (Add,Remove,Update) and notify if one
of the relevant indexes is 0.
This commit is contained in:
Daniel Nephin 2020-10-20 14:57:22 -04:00
parent 499f2822cf
commit 343d133183
1 changed files with 23 additions and 37 deletions

View File

@ -23,12 +23,6 @@ type expiryHeap struct {
// NotifyCh is sent a value whenever the 0 index value of the heap
// changes. This can be used to detect when the earliest value
// changes.
//
// There is a single edge case where the heap will not automatically
// send a notification: if heap.Fix is called manually and the index
// changed is 0 and the change doesn't result in any moves (stays at index
// 0), then we won't detect the change. To work around this, please
// always call the expiryHeap.Fix method instead.
NotifyCh chan struct{}
}
@ -41,10 +35,22 @@ func newExpiryHeap() *expiryHeap {
return h
}
// Add an entry to the heap.
//
// Must be synchronized by the caller.
func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry {
entry := &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)}
entry := &cacheEntryExpiry{
Key: key,
Expires: time.Now().Add(expiry),
// Set the initial heap index to the last index. If the entry is swapped it
// will have the correct index set, and if it remains at the end the last
// index will be correct.
HeapIndex: len(h.Entries),
}
heap.Push(h, entry)
if entry.HeapIndex == 0 {
h.notify()
}
return entry
}
@ -59,7 +65,7 @@ func (h *expiryHeap) Update(idx int, expiry time.Duration) {
// If the previous index and current index are both zero then Fix did not
// swap the entry, and notify must be called here.
if idx == 0 && entry.HeapIndex == 0 {
if idx == 0 || entry.HeapIndex == 0 {
h.notify()
}
}
@ -74,6 +80,10 @@ func (h *expiryHeap) Remove(idx int) {
// the entry was expired while it was fetching. Setting HeapIndex to -1
// indicates that the entry is no longer in the heap, and must be re-added.
entry.HeapIndex = -1
if idx == 0 {
h.notify()
}
}
func (h *expiryHeap) Len() int { return len(h.Entries) }
@ -82,13 +92,6 @@ func (h *expiryHeap) Swap(i, j int) {
h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i]
h.Entries[i].HeapIndex = i
h.Entries[j].HeapIndex = j
// If we're moving the 0 index, update the channel since we need
// to re-update the timer we're waiting on for the soonest expiring
// value.
if i == 0 || j == 0 {
h.notify()
}
}
func (h *expiryHeap) Less(i, j int) bool {
@ -100,21 +103,7 @@ func (h *expiryHeap) Less(i, j int) bool {
// heap.Interface, this isn't expected to be called directly.
func (h *expiryHeap) Push(x interface{}) {
entry := x.(*cacheEntryExpiry)
// Set the initial heap index to the last index. If the entry is swapped it
// will have the correct set, and if it remains at the end the last index will
// be correct.
entry.HeapIndex = len(h.Entries)
// For the first entry, we need to trigger a channel send because
// Swap won't be called; nothing to swap! We can call it right away
// because all heap operations are within a lock.
if len(h.Entries) == 0 {
h.notify()
}
h.Entries = append(h.Entries, entry)
h.Entries = append(h.Entries, x.(*cacheEntryExpiry))
}
// heap.Interface, this isn't expected to be called directly.
@ -126,17 +115,14 @@ func (h *expiryHeap) Pop() interface{} {
return last
}
// TODO: look at calls to notify.
// notify the timer that the head value has changed, so the expiry time has
// also likely changed.
func (h *expiryHeap) notify() {
// Send to channel without blocking. Skips sending if there is already
// an item in the buffered channel.
select {
case h.NotifyCh <- struct{}{}:
// Good
default:
// If the send would've blocked, we just ignore it. The reason this
// is safe is because NotifyCh should always be a buffered channel.
// If this blocks, it means that there is a pending message anyways
// so the receiver will restart regardless.
}
}