2
0
mirror of https://github.com/status-im/consul.git synced 2025-01-12 23:05:28 +00:00

agent/cache: rework how expiry data is stored to be more efficient

This commit is contained in:
Mitchell Hashimoto 2018-04-19 18:28:01 -07:00
parent 595193a781
commit 3b550d2b72
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
3 changed files with 108 additions and 52 deletions

59
agent/cache/cache.go vendored

@ -207,10 +207,16 @@ RETRY_GET:
atomic.AddUint64(&c.hits, 1)
}
// Touch the expiration and fix the heap
entry.ResetExpires()
// Touch the expiration and fix the heap.
c.entriesLock.Lock()
heap.Fix(c.entriesExpiryHeap, *entry.ExpiryHeapIndex)
entry.Expiry.Reset()
idx := entry.Expiry.HeapIndex
heap.Fix(c.entriesExpiryHeap, entry.Expiry.HeapIndex)
if idx == 0 && entry.Expiry.HeapIndex == 0 {
// We didn't move and we were at the head of the heap.
// We need to let the loop know that the value changed.
c.entriesExpiryHeap.Notify()
}
c.entriesLock.Unlock()
return entry.Value, entry.Error
@ -360,29 +366,21 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool) (<-chan struct{},
// Create a new waiter that will be used for the next fetch.
newEntry.Waiter = make(chan struct{})
// The key needs to always be set since this is used by the
// expiration loop to know what entry to delete.
newEntry.Key = key
// If this is a new entry (not in the heap yet), then set the
// initial expiration TTL.
if newEntry.ExpiryHeapIndex == nil {
newEntry.ExpiresTTL = tEntry.Opts.LastGetTTL
newEntry.ResetExpires()
}
// Set our entry
c.entriesLock.Lock()
if newEntry.ExpiryHeapIndex != nil {
// If we're already in the heap, just change the value in-place.
// We don't need to call heap.Fix because the expiry doesn't
// change.
c.entriesExpiryHeap.Entries[*newEntry.ExpiryHeapIndex] = &newEntry
} else {
// Add the new value
newEntry.ExpiryHeapIndex = new(int)
heap.Push(c.entriesExpiryHeap, &newEntry)
// If this is a new entry (not in the heap yet), then setup the
// initial expiry information and insert. If we're already in
// the heap we do nothing since we're reusing the same entry.
if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 {
newEntry.Expiry = &cacheEntryExpiry{
Key: key,
TTL: tEntry.Opts.LastGetTTL,
}
newEntry.Expiry.Reset()
heap.Push(c.entriesExpiryHeap, newEntry.Expiry)
}
c.entries[key] = newEntry
c.entriesLock.Unlock()
@ -453,12 +451,12 @@ func (c *Cache) runExpiryLoop() {
}
// Get the entry expiring soonest
var entry *cacheEntry
var entry *cacheEntryExpiry
var expiryCh <-chan time.Time
c.entriesLock.RLock()
if len(c.entriesExpiryHeap.Entries) > 0 {
entry = c.entriesExpiryHeap.Entries[0]
expiryTimer = time.NewTimer(entry.Expires().Sub(time.Now()))
expiryTimer = time.NewTimer(entry.Expires.Sub(time.Now()))
expiryCh = expiryTimer.C
}
c.entriesLock.RUnlock()
@ -468,10 +466,17 @@ func (c *Cache) runExpiryLoop() {
// Entries changed, so the heap may have changed. Restart loop.
case <-expiryCh:
// Entry expired! Remove it.
c.entriesLock.Lock()
// Entry expired! Remove it.
delete(c.entries, entry.Key)
heap.Remove(c.entriesExpiryHeap, *entry.ExpiryHeapIndex)
heap.Remove(c.entriesExpiryHeap, entry.HeapIndex)
// This is subtle but important: if we race and simultaneously
// evict and fetch a new value, then we set this to -1 to
// have it treated as a new value so that the TTL is extended.
entry.HeapIndex = -1
c.entriesLock.Unlock()
metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1)

@ -414,6 +414,57 @@ func TestCacheGet_expire(t *testing.T) {
typ.AssertExpectations(t)
}
// Test that entries reset their TTL on Get
func TestCacheGet_expireResetGet(t *testing.T) {
t.Parallel()
require := require.New(t)
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
// Register the type with a timeout
c.RegisterType("t", typ, &RegisterOptions{
LastGetTTL: 150 * time.Millisecond,
})
// Configure the type
typ.Static(FetchResult{Value: 42}, nil).Times(2)
// Get, should fetch
req := TestRequest(t, RequestInfo{Key: "hello"})
result, err := c.Get("t", req)
require.Nil(err)
require.Equal(42, result)
// Fetch multiple times, where the total time is well beyond
// the TTL. We should not trigger any fetches during this time.
for i := 0; i < 5; i++ {
// Sleep a bit
time.Sleep(50 * time.Millisecond)
// Get, should not fetch
req = TestRequest(t, RequestInfo{Key: "hello"})
result, err = c.Get("t", req)
require.Nil(err)
require.Equal(42, result)
}
time.Sleep(200 * time.Millisecond)
// Get, should fetch
req = TestRequest(t, RequestInfo{Key: "hello"})
result, err = c.Get("t", req)
require.Nil(err)
require.Equal(42, result)
// Sleep a tiny bit just to let maybe some background calls happen
// then verify that we still only got the one call
time.Sleep(20 * time.Millisecond)
typ.AssertExpectations(t)
}
// Test that Get partitions the caches based on DC so two equivalent requests
// to different datacenters are automatically cached even if their keys are
// the same.

50
agent/cache/entry.go vendored

@ -1,7 +1,6 @@
package cache
import (
"sync/atomic"
"time"
)
@ -11,7 +10,6 @@ import (
// a lot of improvements that can be made here in the long term.
type cacheEntry struct {
// Fields pertaining to the actual value
Key string
Value interface{}
Error error
Index uint64
@ -21,27 +19,25 @@ type cacheEntry struct {
Fetching bool // True if a fetch is already active
Waiter chan struct{} // Closed when this entry is invalidated
// ExpiresRaw is the time.Time that this value expires. The time.Time
// is immune to wall clock changes since we only use APIs that
// operate on the monotonic value. The value is in an atomic.Value
// so we have an efficient way to "touch" the value while maybe being
// read without introducing complex locking.
ExpiresRaw atomic.Value
ExpiresTTL time.Duration
ExpiryHeapIndex *int
// Expiry contains information about the expiration of this
// entry. This is a pointer as its shared as a value in the
// expiryHeap as well.
Expiry *cacheEntryExpiry
}
// Expires is the time that this entry expires. The time.Time value returned
// has the monotonic clock preserved and should be used only with
// monotonic-safe operations to prevent wall clock changes affecting
// cache behavior.
func (e *cacheEntry) Expires() time.Time {
return e.ExpiresRaw.Load().(time.Time)
// cacheEntryExpiry contains the expiration information for a cache
// entry. Any modifications to this struct should be done only while
// the Cache entriesLock is held.
type cacheEntryExpiry struct {
Key string // Key in the cache map
Expires time.Time // Time when entry expires (monotonic clock)
TTL time.Duration // TTL for this entry to extend when resetting
HeapIndex int // Index in the heap
}
// ResetExpires resets the expiration to be the ttl duration from now.
func (e *cacheEntry) ResetExpires() {
e.ExpiresRaw.Store(time.Now().Add(e.ExpiresTTL))
// Reset resets the expiration to be the ttl duration from now.
func (e *cacheEntryExpiry) Reset() {
e.Expires = time.Now().Add(e.TTL)
}
// expiryHeap is a heap implementation that stores information about
@ -50,7 +46,7 @@ func (e *cacheEntry) ResetExpires() {
// All operations on the heap and read/write of the heap contents require
// the proper entriesLock to be held on Cache.
type expiryHeap struct {
Entries []*cacheEntry
Entries []*cacheEntryExpiry
// NotifyCh is sent a value whenever the 0 index value of the heap
// changes. This can be used to detect when the earliest value
@ -62,8 +58,8 @@ func (h *expiryHeap) Len() int { return len(h.Entries) }
func (h *expiryHeap) Swap(i, j int) {
h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i]
*h.Entries[i].ExpiryHeapIndex = i
*h.Entries[j].ExpiryHeapIndex = j
h.Entries[i].HeapIndex = i
h.Entries[j].HeapIndex = j
// If we're moving the 0 index, update the channel since we need
// to re-update the timer we're waiting on for the soonest expiring
@ -77,17 +73,17 @@ func (h *expiryHeap) Less(i, j int) bool {
// The usage of Before here is important (despite being obvious):
// this function uses the monotonic time that should be available
// on the time.Time value so the heap is immune to wall clock changes.
return h.Entries[i].Expires().Before(h.Entries[j].Expires())
return h.Entries[i].Expires.Before(h.Entries[j].Expires)
}
func (h *expiryHeap) Push(x interface{}) {
entry := x.(*cacheEntry)
entry := x.(*cacheEntryExpiry)
// For the first entry, we need to trigger a channel send because
// Swap won't be called; nothing to swap! We can call it right away
// because all heap operations are within a lock.
if len(h.Entries) == 0 {
*entry.ExpiryHeapIndex = 0 // Set correct initial index
entry.HeapIndex = 0 // Set correct initial index
h.NotifyCh <- struct{}{}
}
@ -101,3 +97,7 @@ func (h *expiryHeap) Pop() interface{} {
h.Entries = old[0 : n-1]
return x
}
func (h *expiryHeap) Notify() {
h.NotifyCh <- struct{}{}
}