diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 0852393af4..1a5193792b 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -15,7 +15,6 @@ package cache import ( - "container/heap" "context" "fmt" "io" @@ -28,6 +27,7 @@ import ( "golang.org/x/time/rate" "github.com/hashicorp/consul/lib" + "github.com/hashicorp/consul/lib/ttlcache" ) //go:generate mockery -all -inpkg @@ -88,7 +88,7 @@ type Cache struct { // internal storage format so changing this should be possible safely. entriesLock sync.RWMutex entries map[string]cacheEntry - entriesExpiryHeap *expiryHeap + entriesExpiryHeap *ttlcache.ExpiryHeap // stopped is used as an atomic flag to signal that the Cache has been // discarded so background fetches and expiry processing should stop. @@ -166,16 +166,11 @@ func applyDefaultValuesOnOptions(options Options) Options { // Further settings can be tweaked on the returned value. func New(options Options) *Cache { options = applyDefaultValuesOnOptions(options) - // Initialize the heap. The buffer of 1 is really important because - // its possible for the expiry loop to trigger the heap to update - // itself and it'd block forever otherwise. - h := &expiryHeap{NotifyCh: make(chan struct{}, 1)} - heap.Init(h) ctx, cancel := context.WithCancel(context.Background()) c := &Cache{ types: make(map[string]typeEntry), entries: make(map[string]cacheEntry), - entriesExpiryHeap: h, + entriesExpiryHeap: ttlcache.NewExpiryHeap(), stopCh: make(chan struct{}), options: options, rateLimitContext: ctx, @@ -406,8 +401,7 @@ RETRY_GET: // Touch the expiration and fix the heap. c.entriesLock.Lock() - entry.Expiry.Update(r.TypeEntry.Opts.LastGetTTL) - c.entriesExpiryHeap.Fix(entry.Expiry) + c.entriesExpiryHeap.Update(entry.Expiry.Index(), r.TypeEntry.Opts.LastGetTTL) c.entriesLock.Unlock() // We purposely do not return an error here since the cache only works with @@ -688,10 +682,8 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign // If this is a new entry (not in the heap yet), then setup the // initial expiry information and insert. If we're already in // the heap we do nothing since we're reusing the same entry. - if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 { - newEntry.Expiry = &cacheEntryExpiry{Key: key} - newEntry.Expiry.Update(tEntry.Opts.LastGetTTL) - heap.Push(c.entriesExpiryHeap, newEntry.Expiry) + if newEntry.Expiry == nil || newEntry.Expiry.Index() == ttlcache.NotIndexed { + newEntry.Expiry = c.entriesExpiryHeap.Add(key, tEntry.Opts.LastGetTTL) } c.entries[key] = newEntry @@ -748,47 +740,30 @@ func backOffWait(failures uint) time.Duration { // runExpiryLoop is a blocking function that watches the expiration // heap and invalidates entries that have expired. func (c *Cache) runExpiryLoop() { - var expiryTimer *time.Timer for { - // If we have a previous timer, stop it. - if expiryTimer != nil { - expiryTimer.Stop() - } - - // Get the entry expiring soonest - var entry *cacheEntryExpiry - var expiryCh <-chan time.Time c.entriesLock.RLock() - if len(c.entriesExpiryHeap.Entries) > 0 { - entry = c.entriesExpiryHeap.Entries[0] - expiryTimer = time.NewTimer(time.Until(entry.Expires)) - expiryCh = expiryTimer.C - } + timer := c.entriesExpiryHeap.Next() c.entriesLock.RUnlock() select { case <-c.stopCh: + timer.Stop() return case <-c.entriesExpiryHeap.NotifyCh: - // Entries changed, so the heap may have changed. Restart loop. + timer.Stop() + continue - case <-expiryCh: + case <-timer.Wait(): c.entriesLock.Lock() - // Perform cleanup operations on the entry's state, if applicable. - state := c.entries[entry.Key].State - if closer, ok := state.(io.Closer); ok { + entry := timer.Entry + if closer, ok := c.entries[entry.Key()].State.(io.Closer); ok { closer.Close() } // Entry expired! Remove it. - delete(c.entries, entry.Key) - heap.Remove(c.entriesExpiryHeap, entry.HeapIndex) - - // This is subtle but important: if we race and simultaneously - // evict and fetch a new value, then we set this to -1 to - // have it treated as a new value so that the TTL is extended. - entry.HeapIndex = -1 + delete(c.entries, entry.Key()) + c.entriesExpiryHeap.Remove(entry.Index()) // Set some metrics metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1) @@ -829,7 +804,6 @@ func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) erro Index: res.Index, FetchedAt: time.Now(), Waiter: make(chan struct{}), - Expiry: &cacheEntryExpiry{Key: key}, FetchRateLimiter: rate.NewLimiter( c.options.EntryFetchRate, c.options.EntryFetchMaxBurst, diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 8832572c31..8b77773f9e 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -15,6 +15,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/time/rate" + "github.com/hashicorp/consul/lib/ttlcache" "github.com/hashicorp/consul/sdk/testutil" ) @@ -1000,6 +1001,9 @@ func (t *testPartitionType) RegisterOptions() RegisterOptions { // Test that background refreshing reports correct Age in failure and happy // states. func TestCacheGet_refreshAge(t *testing.T) { + if testing.Short() { + t.Skip("too slow for -short run") + } t.Parallel() require := require.New(t) @@ -1402,3 +1406,73 @@ OUT: } } } + +func TestCache_ExpiryLoop_ExitsWhenStopped(t *testing.T) { + c := &Cache{ + stopCh: make(chan struct{}), + entries: make(map[string]cacheEntry), + entriesExpiryHeap: ttlcache.NewExpiryHeap(), + } + chStart := make(chan struct{}) + chDone := make(chan struct{}) + go func() { + close(chStart) + c.runExpiryLoop() + close(chDone) + }() + + <-chStart + close(c.stopCh) + + select { + case <-chDone: + case <-time.After(50 * time.Millisecond): + t.Fatalf("expected loop to exit when stopped") + } +} + +func TestCache_Prepopulate(t *testing.T) { + typ := &fakeType{index: 5} + c := New(Options{}) + c.RegisterType("t", typ) + + c.Prepopulate("t", FetchResult{Value: 17, Index: 1}, "dc1", "token", "v1") + + ctx := context.Background() + req := fakeRequest{ + info: RequestInfo{ + Key: "v1", + Token: "token", + Datacenter: "dc1", + MinIndex: 1, + }, + } + result, _, err := c.Get(ctx, "t", req) + require.NoError(t, err) + require.Equal(t, 17, result) +} + +type fakeType struct { + index uint64 +} + +func (f fakeType) Fetch(_ FetchOptions, _ Request) (FetchResult, error) { + idx := atomic.LoadUint64(&f.index) + return FetchResult{Value: int(idx * 2), Index: idx}, nil +} + +func (f fakeType) RegisterOptions() RegisterOptions { + return RegisterOptions{Refresh: true} +} + +var _ Type = (*fakeType)(nil) + +type fakeRequest struct { + info RequestInfo +} + +func (f fakeRequest) CacheInfo() RequestInfo { + return f.info +} + +var _ Request = (*fakeRequest)(nil) diff --git a/agent/cache/entry.go b/agent/cache/entry.go index 440c654ba8..0c71e94437 100644 --- a/agent/cache/entry.go +++ b/agent/cache/entry.go @@ -1,10 +1,11 @@ package cache import ( - "container/heap" "time" "golang.org/x/time/rate" + + "github.com/hashicorp/consul/lib/ttlcache" ) // cacheEntry stores a single cache entry. @@ -31,8 +32,8 @@ type cacheEntry struct { // Expiry contains information about the expiration of this // entry. This is a pointer as its shared as a value in the - // expiryHeap as well. - Expiry *cacheEntryExpiry + // ExpiryHeap as well. + Expiry *ttlcache.Entry // FetchedAt stores the time the cache entry was retrieved for determining // it's age later. @@ -46,118 +47,3 @@ type cacheEntry struct { // FetchRateLimiter limits the rate at which fetch is called for this entry. FetchRateLimiter *rate.Limiter } - -// cacheEntryExpiry contains the expiration information for a cache -// entry. Any modifications to this struct should be done only while -// the Cache entriesLock is held. -type cacheEntryExpiry struct { - Key string // Key in the cache map - Expires time.Time // Time when entry expires (monotonic clock) - HeapIndex int // Index in the heap -} - -// Update the expiry to d time from now. -func (e *cacheEntryExpiry) Update(d time.Duration) { - e.Expires = time.Now().Add(d) -} - -// expiryHeap is a heap implementation that stores information about -// when entries expire. Implements container/heap.Interface. -// -// All operations on the heap and read/write of the heap contents require -// the proper entriesLock to be held on Cache. -type expiryHeap struct { - Entries []*cacheEntryExpiry - - // NotifyCh is sent a value whenever the 0 index value of the heap - // changes. This can be used to detect when the earliest value - // changes. - // - // There is a single edge case where the heap will not automatically - // send a notification: if heap.Fix is called manually and the index - // changed is 0 and the change doesn't result in any moves (stays at index - // 0), then we won't detect the change. To work around this, please - // always call the expiryHeap.Fix method instead. - NotifyCh chan struct{} -} - -// Identical to heap.Fix for this heap instance but will properly handle -// the edge case where idx == 0 and no heap modification is necessary, -// and still notify the NotifyCh. -// -// This is important for cache expiry since the expiry time may have been -// extended and if we don't send a message to the NotifyCh then we'll never -// reset the timer and the entry will be evicted early. -func (h *expiryHeap) Fix(entry *cacheEntryExpiry) { - idx := entry.HeapIndex - heap.Fix(h, idx) - - // This is the edge case we handle: if the prev (idx) and current (HeapIndex) - // is zero, it means the head-of-line didn't change while the value - // changed. Notify to reset our expiry worker. - if idx == 0 && entry.HeapIndex == 0 { - h.notify() - } -} - -func (h *expiryHeap) Len() int { return len(h.Entries) } - -func (h *expiryHeap) Swap(i, j int) { - h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i] - h.Entries[i].HeapIndex = i - h.Entries[j].HeapIndex = j - - // If we're moving the 0 index, update the channel since we need - // to re-update the timer we're waiting on for the soonest expiring - // value. - if i == 0 || j == 0 { - h.notify() - } -} - -func (h *expiryHeap) Less(i, j int) bool { - // The usage of Before here is important (despite being obvious): - // this function uses the monotonic time that should be available - // on the time.Time value so the heap is immune to wall clock changes. - return h.Entries[i].Expires.Before(h.Entries[j].Expires) -} - -// heap.Interface, this isn't expected to be called directly. -func (h *expiryHeap) Push(x interface{}) { - entry := x.(*cacheEntryExpiry) - - // Set initial heap index, if we're going to the end then Swap - // won't be called so we need to initialize - entry.HeapIndex = len(h.Entries) - - // For the first entry, we need to trigger a channel send because - // Swap won't be called; nothing to swap! We can call it right away - // because all heap operations are within a lock. - if len(h.Entries) == 0 { - h.notify() - } - - h.Entries = append(h.Entries, entry) -} - -// heap.Interface, this isn't expected to be called directly. -func (h *expiryHeap) Pop() interface{} { - old := h.Entries - n := len(old) - x := old[n-1] - h.Entries = old[0 : n-1] - return x -} - -func (h *expiryHeap) notify() { - select { - case h.NotifyCh <- struct{}{}: - // Good - - default: - // If the send would've blocked, we just ignore it. The reason this - // is safe is because NotifyCh should always be a buffered channel. - // If this blocks, it means that there is a pending message anyways - // so the receiver will restart regardless. - } -} diff --git a/agent/cache/entry_test.go b/agent/cache/entry_test.go deleted file mode 100644 index fe40733630..0000000000 --- a/agent/cache/entry_test.go +++ /dev/null @@ -1,91 +0,0 @@ -package cache - -import ( - "container/heap" - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func TestExpiryHeap_impl(t *testing.T) { - var _ heap.Interface = new(expiryHeap) -} - -func TestExpiryHeap(t *testing.T) { - require := require.New(t) - now := time.Now() - ch := make(chan struct{}, 10) // buffered to prevent blocking in tests - h := &expiryHeap{NotifyCh: ch} - - // Init, shouldn't trigger anything - heap.Init(h) - testNoMessage(t, ch) - - // Push an initial value, expect one message - entry := &cacheEntryExpiry{Key: "foo", HeapIndex: -1, Expires: now.Add(100)} - heap.Push(h, entry) - require.Equal(0, entry.HeapIndex) - testMessage(t, ch) - testNoMessage(t, ch) // exactly one asserted above - - // Push another that goes earlier than entry - entry2 := &cacheEntryExpiry{Key: "bar", HeapIndex: -1, Expires: now.Add(50)} - heap.Push(h, entry2) - require.Equal(0, entry2.HeapIndex) - require.Equal(1, entry.HeapIndex) - testMessage(t, ch) - testNoMessage(t, ch) // exactly one asserted above - - // Push another that goes at the end - entry3 := &cacheEntryExpiry{Key: "bar", HeapIndex: -1, Expires: now.Add(1000)} - heap.Push(h, entry3) - require.Equal(2, entry3.HeapIndex) - testNoMessage(t, ch) // no notify cause index 0 stayed the same - - // Remove the first entry (not Pop, since we don't use Pop, but that works too) - remove := h.Entries[0] - heap.Remove(h, remove.HeapIndex) - require.Equal(0, entry.HeapIndex) - require.Equal(1, entry3.HeapIndex) - testMessage(t, ch) - testMessage(t, ch) // we have two because two swaps happen - testNoMessage(t, ch) - - // Let's change entry 3 to be early, and fix it - entry3.Expires = now.Add(10) - h.Fix(entry3) - require.Equal(1, entry.HeapIndex) - require.Equal(0, entry3.HeapIndex) - testMessage(t, ch) - testNoMessage(t, ch) - - // Let's change entry 3 again, this is an edge case where if the 0th - // element changed, we didn't trigger the channel. Our Fix func should. - entry.Expires = now.Add(20) - h.Fix(entry3) - require.Equal(1, entry.HeapIndex) // no move - require.Equal(0, entry3.HeapIndex) - testMessage(t, ch) - testNoMessage(t, ch) // one message -} - -func testNoMessage(t *testing.T, ch <-chan struct{}) { - t.Helper() - - select { - case <-ch: - t.Fatal("should not have a message") - default: - } -} - -func testMessage(t *testing.T, ch <-chan struct{}) { - t.Helper() - - select { - case <-ch: - default: - t.Fatal("should have a message") - } -} diff --git a/lib/ttlcache/eviction.go b/lib/ttlcache/eviction.go new file mode 100644 index 0000000000..d07b2bf465 --- /dev/null +++ b/lib/ttlcache/eviction.go @@ -0,0 +1,195 @@ +/* +Package ttlcache provides an ExpiryHeap that can be used by a cache to track the +expiration time of its entries. When an expiry is reached the Timer will fire +and the entry can be removed. +*/ +package ttlcache + +import ( + "container/heap" + "time" +) + +// Entry in the ExpiryHeap, tracks the index and expiry time of an item in a +// ttl cache. +type Entry struct { + key string + expiry time.Time + heapIndex int +} + +// NotIndexed indicates that the entry does not exist in the heap. Either because +// it is nil, or because it was removed. +const NotIndexed = -1 + +// Index returns the index of this entry within the heap. +func (e *Entry) Index() int { + if e == nil { + return NotIndexed + } + return e.heapIndex +} + +// Key returns the key for the entry in the heap. +func (e *Entry) Key() string { + return e.key +} + +// ExpiryHeap is a heap that is ordered by the expiry time of entries. It may +// be used by a cache or storage to expiry items after a TTL. +// +// ExpiryHeap expects the caller to synchronize calls to most of its methods. This +// is necessary because the cache needs to ensure that updates to both its +// storage and the ExpiryHeap are synchronized. +type ExpiryHeap struct { + entries []*Entry + + // NotifyCh is sent a value whenever the 0 index value of the heap + // changes. This can be used to detect when the earliest value + // changes. + NotifyCh chan struct{} +} + +// NewExpiryHeap creates and returns a new ExpiryHeap. +func NewExpiryHeap() *ExpiryHeap { + h := &ExpiryHeap{NotifyCh: make(chan struct{}, 1)} + heap.Init((*entryHeap)(h)) + return h +} + +// Add an entry to the heap. +// +// Must be synchronized by the caller. +func (h *ExpiryHeap) Add(key string, expiry time.Duration) *Entry { + entry := &Entry{ + key: key, + expiry: time.Now().Add(expiry), + // Set the initial heap index to the last index. If the entry is swapped it + // will have the correct index set, and if it remains at the end the last + // index will be correct. + heapIndex: len(h.entries), + } + heap.Push((*entryHeap)(h), entry) + if entry.heapIndex == 0 { + h.notify() + } + return entry +} + +// Update the entry that is currently at idx with the new expiry time. The heap +// will be rebalanced after the entry is updated. +// +// Must be synchronized by the caller. +func (h *ExpiryHeap) Update(idx int, expiry time.Duration) { + if idx == NotIndexed { + // the previous entry did not have a valid index, its not in the heap + return + } + entry := h.entries[idx] + entry.expiry = time.Now().Add(expiry) + heap.Fix((*entryHeap)(h), idx) + + // If the previous index and current index are both zero then Fix did not + // swap the entry, and notify must be called here. + if idx == 0 || entry.heapIndex == 0 { + h.notify() + } +} + +// Remove the entry at idx from the heap. +// +// Must be synchronized by the caller. +func (h *ExpiryHeap) Remove(idx int) { + entry := h.entries[idx] + heap.Remove((*entryHeap)(h), idx) + + // A goroutine which is fetching a new value will have a reference to this + // entry. When it re-acquires the lock it needs to be informed that + // the entry was expired while it was fetching. Setting heapIndex to -1 + // indicates that the entry is no longer in the heap, and must be re-added. + entry.heapIndex = NotIndexed + + if idx == 0 { + h.notify() + } +} + +type entryHeap ExpiryHeap + +func (h *entryHeap) Len() int { return len(h.entries) } + +func (h *entryHeap) Swap(i, j int) { + h.entries[i], h.entries[j] = h.entries[j], h.entries[i] + h.entries[i].heapIndex = i + h.entries[j].heapIndex = j +} + +func (h *entryHeap) Less(i, j int) bool { + // The usage of Before here is important (despite being obvious): + // this function uses the monotonic time that should be available + // on the time.Time value so the heap is immune to wall clock changes. + return h.entries[i].expiry.Before(h.entries[j].expiry) +} + +// heap.Interface, this isn't expected to be called directly. +func (h *entryHeap) Push(x interface{}) { + h.entries = append(h.entries, x.(*Entry)) +} + +// heap.Interface, this isn't expected to be called directly. +func (h *entryHeap) Pop() interface{} { + n := len(h.entries) + entries := h.entries + last := entries[n-1] + h.entries = entries[0 : n-1] + return last +} + +// notify the timer that the head value has changed, so the expiry time has +// also likely changed. +func (h *ExpiryHeap) notify() { + // Send to channel without blocking. Skips sending if there is already + // an item in the buffered channel. + select { + case h.NotifyCh <- struct{}{}: + default: + } +} + +// Next returns a Timer that waits until the first entry in the heap expires. +// +// Must be synchronized by the caller. +func (h *ExpiryHeap) Next() Timer { + if len(h.entries) == 0 { + return Timer{} + } + entry := h.entries[0] + return Timer{ + timer: time.NewTimer(time.Until(entry.expiry)), + Entry: entry, + } +} + +// Timer provides a channel to block on. When the Wait channel receives an +// item the Timer.Entry has expired. The caller is expected to call +// ExpiryHeap.Remove with the Entry.Index(). +// +// The caller is responsible for calling Stop to stop the timer if the timer has +// not fired. +type Timer struct { + timer *time.Timer + Entry *Entry +} + +func (t *Timer) Wait() <-chan time.Time { + if t.timer == nil { + return nil + } + return t.timer.C +} + +func (t *Timer) Stop() { + if t.timer != nil { + t.timer.Stop() + } +} diff --git a/lib/ttlcache/eviction_test.go b/lib/ttlcache/eviction_test.go new file mode 100644 index 0000000000..1fdd8e485c --- /dev/null +++ b/lib/ttlcache/eviction_test.go @@ -0,0 +1,91 @@ +package ttlcache + +import ( + "container/heap" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +var _ heap.Interface = (*entryHeap)(nil) + +func TestExpiryHeap(t *testing.T) { + h := NewExpiryHeap() + ch := h.NotifyCh + var entry, entry2, entry3 *Entry + + // Init, shouldn't trigger anything + testNoMessage(t, ch) + + runStep(t, "add an entry", func(t *testing.T) { + entry = h.Add("foo", 100*time.Millisecond) + assert.Equal(t, 0, entry.heapIndex) + testMessage(t, ch) + testNoMessage(t, ch) // exactly one asserted above + }) + + runStep(t, "add a second entry in front", func(t *testing.T) { + entry2 = h.Add("bar", 50*time.Millisecond) + assert.Equal(t, 0, entry2.heapIndex) + assert.Equal(t, 1, entry.heapIndex) + testMessage(t, ch) + testNoMessage(t, ch) // exactly one asserted above + }) + + runStep(t, "add a third entry at the end", func(t *testing.T) { + entry3 = h.Add("baz", 1000*time.Millisecond) + assert.Equal(t, 2, entry3.heapIndex) + testNoMessage(t, ch) // no notify cause index 0 stayed the same + }) + + runStep(t, "remove the first entry", func(t *testing.T) { + h.Remove(0) + assert.Equal(t, 0, entry.heapIndex) + assert.Equal(t, 1, entry3.heapIndex) + testMessage(t, ch) + testNoMessage(t, ch) + }) + + runStep(t, "update entry3 to expire first", func(t *testing.T) { + h.Update(entry3.heapIndex, 10*time.Millisecond) + assert.Equal(t, 1, entry.heapIndex) + assert.Equal(t, 0, entry3.heapIndex) + testMessage(t, ch) + testNoMessage(t, ch) + }) + + runStep(t, "0th element change triggers a notify", func(t *testing.T) { + h.Update(entry3.heapIndex, 20) + assert.Equal(t, 1, entry.heapIndex) // no move + assert.Equal(t, 0, entry3.heapIndex) + testMessage(t, ch) + testNoMessage(t, ch) // one message + }) +} + +func testNoMessage(t *testing.T, ch <-chan struct{}) { + t.Helper() + + select { + case <-ch: + t.Fatal("should not have a message") + default: + } +} + +func testMessage(t *testing.T, ch <-chan struct{}) { + t.Helper() + + select { + case <-ch: + default: + t.Fatal("should have a message") + } +} + +func runStep(t *testing.T, name string, fn func(t *testing.T)) { + if !t.Run(name, fn) { + t.FailNow() + } +}