mirror of https://github.com/status-im/consul.git
cache: Move more of the expiryLoop into the Heap
This commit is contained in:
parent
2cdc90e01b
commit
499f2822cf
|
@ -15,7 +15,6 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
|
@ -166,16 +165,11 @@ func applyDefaultValuesOnOptions(options Options) Options {
|
|||
// Further settings can be tweaked on the returned value.
|
||||
func New(options Options) *Cache {
|
||||
options = applyDefaultValuesOnOptions(options)
|
||||
// Initialize the heap. The buffer of 1 is really important because
|
||||
// its possible for the expiry loop to trigger the heap to update
|
||||
// itself and it'd block forever otherwise.
|
||||
h := &expiryHeap{NotifyCh: make(chan struct{}, 1)}
|
||||
heap.Init(h)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
c := &Cache{
|
||||
types: make(map[string]typeEntry),
|
||||
entries: make(map[string]cacheEntry),
|
||||
entriesExpiryHeap: h,
|
||||
entriesExpiryHeap: newExpiryHeap(),
|
||||
stopCh: make(chan struct{}),
|
||||
options: options,
|
||||
rateLimitContext: ctx,
|
||||
|
@ -745,47 +739,30 @@ func backOffWait(failures uint) time.Duration {
|
|||
// runExpiryLoop is a blocking function that watches the expiration
|
||||
// heap and invalidates entries that have expired.
|
||||
func (c *Cache) runExpiryLoop() {
|
||||
var expiryTimer *time.Timer
|
||||
for {
|
||||
// If we have a previous timer, stop it.
|
||||
if expiryTimer != nil {
|
||||
expiryTimer.Stop()
|
||||
}
|
||||
|
||||
// Get the entry expiring soonest
|
||||
var entry *cacheEntryExpiry
|
||||
var expiryCh <-chan time.Time
|
||||
c.entriesLock.RLock()
|
||||
if len(c.entriesExpiryHeap.Entries) > 0 {
|
||||
entry = c.entriesExpiryHeap.Entries[0]
|
||||
expiryTimer = time.NewTimer(time.Until(entry.Expires))
|
||||
expiryCh = expiryTimer.C
|
||||
}
|
||||
timer := c.entriesExpiryHeap.Next()
|
||||
c.entriesLock.RUnlock()
|
||||
|
||||
select {
|
||||
case <-c.stopCh:
|
||||
timer.Stop()
|
||||
return
|
||||
case <-c.entriesExpiryHeap.NotifyCh:
|
||||
// Entries changed, so the heap may have changed. Restart loop.
|
||||
timer.Stop()
|
||||
continue
|
||||
|
||||
case <-expiryCh:
|
||||
case <-timer.Wait():
|
||||
c.entriesLock.Lock()
|
||||
|
||||
// Perform cleanup operations on the entry's state, if applicable.
|
||||
state := c.entries[entry.Key].State
|
||||
if closer, ok := state.(io.Closer); ok {
|
||||
entry := timer.Entry
|
||||
if closer, ok := c.entries[entry.Key].State.(io.Closer); ok {
|
||||
closer.Close()
|
||||
}
|
||||
|
||||
// Entry expired! Remove it.
|
||||
delete(c.entries, entry.Key)
|
||||
heap.Remove(c.entriesExpiryHeap, entry.HeapIndex)
|
||||
|
||||
// This is subtle but important: if we race and simultaneously
|
||||
// evict and fetch a new value, then we set this to -1 to
|
||||
// have it treated as a new value so that the TTL is extended.
|
||||
entry.HeapIndex = -1
|
||||
c.entriesExpiryHeap.Remove(entry.HeapIndex)
|
||||
|
||||
// Set some metrics
|
||||
metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1)
|
||||
|
|
|
@ -12,11 +12,6 @@ type cacheEntryExpiry struct {
|
|||
HeapIndex int // Index in the heap
|
||||
}
|
||||
|
||||
// TODO: use or remove
|
||||
func newCacheEntry(key string, expiry time.Duration) *cacheEntryExpiry {
|
||||
return &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)}
|
||||
}
|
||||
|
||||
// expiryHeap is a container/heap.Interface implementation that expires entries
|
||||
// in the cache when their expiration time is reached.
|
||||
//
|
||||
|
@ -37,6 +32,16 @@ type expiryHeap struct {
|
|||
NotifyCh chan struct{}
|
||||
}
|
||||
|
||||
// Initialize the heap. The buffer of 1 is really important because
|
||||
// its possible for the expiry loop to trigger the heap to update
|
||||
// itself and it'd block forever otherwise.
|
||||
func newExpiryHeap() *expiryHeap {
|
||||
h := &expiryHeap{NotifyCh: make(chan struct{}, 1)}
|
||||
heap.Init(h)
|
||||
return h
|
||||
}
|
||||
|
||||
// Must be synchronized by the caller.
|
||||
func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry {
|
||||
entry := &cacheEntryExpiry{Key: key, Expires: time.Now().Add(expiry)}
|
||||
heap.Push(h, entry)
|
||||
|
@ -59,6 +64,18 @@ func (h *expiryHeap) Update(idx int, expiry time.Duration) {
|
|||
}
|
||||
}
|
||||
|
||||
// Must be synchronized by the caller.
|
||||
func (h *expiryHeap) Remove(idx int) {
|
||||
entry := h.Entries[idx]
|
||||
heap.Remove(h, idx)
|
||||
|
||||
// A goroutine which is fetching a new value will have a reference to this
|
||||
// entry. When it re-acquires the lock it needs to be informed that
|
||||
// the entry was expired while it was fetching. Setting HeapIndex to -1
|
||||
// indicates that the entry is no longer in the heap, and must be re-added.
|
||||
entry.HeapIndex = -1
|
||||
}
|
||||
|
||||
func (h *expiryHeap) Len() int { return len(h.Entries) }
|
||||
|
||||
func (h *expiryHeap) Swap(i, j int) {
|
||||
|
@ -109,6 +126,7 @@ func (h *expiryHeap) Pop() interface{} {
|
|||
return last
|
||||
}
|
||||
|
||||
// TODO: look at calls to notify.
|
||||
func (h *expiryHeap) notify() {
|
||||
select {
|
||||
case h.NotifyCh <- struct{}{}:
|
||||
|
@ -121,3 +139,33 @@ func (h *expiryHeap) notify() {
|
|||
// so the receiver will restart regardless.
|
||||
}
|
||||
}
|
||||
|
||||
// Must be synchronized by the caller.
|
||||
func (h *expiryHeap) Next() timer {
|
||||
if len(h.Entries) == 0 {
|
||||
return timer{}
|
||||
}
|
||||
entry := h.Entries[0]
|
||||
return timer{
|
||||
timer: time.NewTimer(time.Until(entry.Expires)),
|
||||
Entry: entry,
|
||||
}
|
||||
}
|
||||
|
||||
type timer struct {
|
||||
timer *time.Timer
|
||||
Entry *cacheEntryExpiry
|
||||
}
|
||||
|
||||
func (t *timer) Wait() <-chan time.Time {
|
||||
if t.timer == nil {
|
||||
return nil
|
||||
}
|
||||
return t.timer.C
|
||||
}
|
||||
|
||||
func (t *timer) Stop() {
|
||||
if t.timer != nil {
|
||||
t.timer.Stop()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,12 +43,10 @@ func TestExpiryHeap(t *testing.T) {
|
|||
})
|
||||
|
||||
runStep(t, "remove the first entry", func(t *testing.T) {
|
||||
remove := h.Entries[0]
|
||||
heap.Remove(h, remove.HeapIndex)
|
||||
h.Remove(0)
|
||||
require.Equal(0, entry.HeapIndex)
|
||||
require.Equal(1, entry3.HeapIndex)
|
||||
testMessage(t, ch)
|
||||
testMessage(t, ch) // we have two because two swaps happen
|
||||
testNoMessage(t, ch)
|
||||
})
|
||||
|
||||
|
@ -94,3 +92,27 @@ func runStep(t *testing.T, name string, fn func(t *testing.T)) {
|
|||
t.FailNow()
|
||||
}
|
||||
}
|
||||
|
||||
func TestExpiryLoop_ExitsWhenStopped(t *testing.T) {
|
||||
c := &Cache{
|
||||
stopCh: make(chan struct{}),
|
||||
entries: make(map[string]cacheEntry),
|
||||
entriesExpiryHeap: newExpiryHeap(),
|
||||
}
|
||||
chStart := make(chan struct{})
|
||||
chDone := make(chan struct{})
|
||||
go func() {
|
||||
close(chStart)
|
||||
c.runExpiryLoop()
|
||||
close(chDone)
|
||||
}()
|
||||
|
||||
<-chStart
|
||||
close(c.stopCh)
|
||||
|
||||
select {
|
||||
case <-chDone:
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
t.Fatalf("expected loop to exit when stopped")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue