diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 9296a5fb18..0d332a21e4 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -15,6 +15,7 @@ package cache import ( + "container/heap" "fmt" "sync" "sync/atomic" @@ -54,7 +55,11 @@ type Cache struct { typesLock sync.RWMutex types map[string]typeEntry - // entries contains the actual cache data. + // entries contains the actual cache data. Access to entries and + // entriesExpiryHeap must be protected by entriesLock. + // + // entriesExpiryHeap is a heap of *cacheEntry values ordered by + // expiry, with the soonest to expire being first in the list (index 0). // // NOTE(mitchellh): The entry map key is currently a string in the format // of "//" in order to properly partition @@ -62,21 +67,9 @@ type Cache struct { // big drawbacks: we can't evict by datacenter, ACL token, etc. For an // initial implementaiton this works and the tests are agnostic to the // internal storage format so changing this should be possible safely. - entriesLock sync.RWMutex - entries map[string]cacheEntry -} - -// cacheEntry stores a single cache entry. -type cacheEntry struct { - // Fields pertaining to the actual value - Value interface{} - Error error - Index uint64 - - // Metadata that is used for internal accounting - Valid bool - Fetching bool - Waiter chan struct{} + entriesLock sync.RWMutex + entries map[string]cacheEntry + entriesExpiryHeap *expiryHeap } // typeEntry is a single type that is registered with a Cache. @@ -93,16 +86,34 @@ type Options struct { // New creates a new cache with the given RPC client and reasonable defaults. // Further settings can be tweaked on the returned value. func New(*Options) *Cache { - return &Cache{ - entries: make(map[string]cacheEntry), - types: make(map[string]typeEntry), + // Initialize the heap. The buffer of 1 is really important because + // its possible for the expiry loop to trigger the heap to update + // itself and it'd block forever otherwise. + h := &expiryHeap{NotifyCh: make(chan struct{}, 1)} + heap.Init(h) + + c := &Cache{ + types: make(map[string]typeEntry), + entries: make(map[string]cacheEntry), + entriesExpiryHeap: h, } + + // Start the expiry watcher + go c.runExpiryLoop() + + return c } // RegisterOptions are options that can be associated with a type being // registered for the cache. This changes the behavior of the cache for // this type. type RegisterOptions struct { + // LastGetTTL is the time that the values returned by this type remain + // in the cache after the last get operation. If a value isn't accessed + // within this duration, the value is purged from the cache and + // background refreshing will cease. + LastGetTTL time.Duration + // Refresh configures whether the data is actively refreshed or if // the data is only refreshed on an explicit Get. The default (false) // is to only request data on explicit Get. @@ -137,6 +148,9 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) { if opts == nil { opts = &RegisterOptions{} } + if opts.LastGetTTL == 0 { + opts.LastGetTTL = 72 * time.Hour // reasonable default is days + } c.typesLock.Lock() defer c.typesLock.Unlock() @@ -193,6 +207,12 @@ RETRY_GET: atomic.AddUint64(&c.hits, 1) } + // Touch the expiration and fix the heap + entry.ResetExpires() + c.entriesLock.Lock() + heap.Fix(c.entriesExpiryHeap, *entry.ExpiryHeapIndex) + c.entriesLock.Unlock() + return entry.Value, entry.Error } } @@ -230,7 +250,7 @@ RETRY_GET: // At this point, we know we either don't have a value at all or the // value we have is too old. We need to wait for new data. - waiterCh, err := c.fetch(t, key, r) + waiterCh, err := c.fetch(t, key, r, true) if err != nil { return nil, err } @@ -256,7 +276,11 @@ func (c *Cache) entryKey(r *RequestInfo) string { // background fetch is already running for a matching Request, the waiter // channel for that request is returned. The effect of this is that there // is only ever one blocking query for any matching requests. -func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) { +// +// If allowNew is true then the fetch should create the cache entry +// if it doesn't exist. If this is false, then fetch will do nothing +// if the entry doesn't exist. This latter case is to support refreshing. +func (c *Cache) fetch(t, key string, r Request, allowNew bool) (<-chan struct{}, error) { // Get the type that we're fetching c.typesLock.RLock() tEntry, ok := c.types[t] @@ -270,6 +294,15 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) { defer c.entriesLock.Unlock() entry, ok := c.entries[key] + // If we aren't allowing new values and we don't have an existing value, + // return immediately. We return an immediately-closed channel so nothing + // blocks. + if !ok && !allowNew { + ch := make(chan struct{}) + close(ch) + return ch, nil + } + // If we already have an entry and it is actively fetching, then return // the currently active waiter. if ok && entry.Fetching { @@ -305,14 +338,10 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) { metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1) } - var newEntry cacheEntry - if result.Value == nil { - // If no value was set, then we do not change the prior entry. - // Instead, we just update the waiter to be new so that another - // Get will wait on the correct value. - newEntry = entry - newEntry.Fetching = false - } else { + // Copy the existing entry to start. + newEntry := entry + newEntry.Fetching = false + if result.Value != nil { // A new value was given, so we create a brand new entry. newEntry.Value = result.Value newEntry.Index = result.Index @@ -331,12 +360,33 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) { // Create a new waiter that will be used for the next fetch. newEntry.Waiter = make(chan struct{}) - // Insert + // The key needs to always be set since this is used by the + // expiration loop to know what entry to delete. + newEntry.Key = key + + // If this is a new entry (not in the heap yet), then set the + // initial expiration TTL. + if newEntry.ExpiryHeapIndex == nil { + newEntry.ExpiresTTL = tEntry.Opts.LastGetTTL + newEntry.ResetExpires() + } + + // Set our entry c.entriesLock.Lock() + if newEntry.ExpiryHeapIndex != nil { + // If we're already in the heap, just change the value in-place. + // We don't need to call heap.Fix because the expiry doesn't + // change. + c.entriesExpiryHeap.Entries[*newEntry.ExpiryHeapIndex] = &newEntry + } else { + // Add the new value + newEntry.ExpiryHeapIndex = new(int) + heap.Push(c.entriesExpiryHeap, &newEntry) + } c.entries[key] = newEntry c.entriesLock.Unlock() - // Trigger the waiter + // Trigger the old waiter close(entry.Waiter) // If refresh is enabled, run the refresh in due time. The refresh @@ -386,8 +436,47 @@ func (c *Cache) refresh(opts *RegisterOptions, t string, key string, r Request) time.Sleep(opts.RefreshTimer) } - // Trigger - c.fetch(t, key, r) + // Trigger. The "allowNew" field is false because in the time we were + // waiting to refresh we may have expired and got evicted. If that + // happened, we don't want to create a new entry. + c.fetch(t, key, r, false) +} + +// runExpiryLoop is a blocking function that watches the expiration +// heap and invalidates entries that have expired. +func (c *Cache) runExpiryLoop() { + var expiryTimer *time.Timer + for { + // If we have a previous timer, stop it. + if expiryTimer != nil { + expiryTimer.Stop() + } + + // Get the entry expiring soonest + var entry *cacheEntry + var expiryCh <-chan time.Time + c.entriesLock.RLock() + if len(c.entriesExpiryHeap.Entries) > 0 { + entry = c.entriesExpiryHeap.Entries[0] + expiryTimer = time.NewTimer(entry.Expires().Sub(time.Now())) + expiryCh = expiryTimer.C + } + c.entriesLock.RUnlock() + + select { + case <-c.entriesExpiryHeap.NotifyCh: + // Entries changed, so the heap may have changed. Restart loop. + + case <-expiryCh: + // Entry expired! Remove it. + c.entriesLock.Lock() + delete(c.entries, entry.Key) + heap.Remove(c.entriesExpiryHeap, *entry.ExpiryHeapIndex) + c.entriesLock.Unlock() + + metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1) + } + } } // Returns the number of cache hits. Safe to call concurrently. diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 49edc6e28d..7ac8213f37 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -369,6 +369,51 @@ func TestCacheGet_fetchTimeout(t *testing.T) { require.Equal(timeout, actual) } +// Test that entries expire +func TestCacheGet_expire(t *testing.T) { + t.Parallel() + + require := require.New(t) + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + + // Register the type with a timeout + c.RegisterType("t", typ, &RegisterOptions{ + LastGetTTL: 400 * time.Millisecond, + }) + + // Configure the type + typ.Static(FetchResult{Value: 42}, nil).Times(2) + + // Get, should fetch + req := TestRequest(t, RequestInfo{Key: "hello"}) + result, err := c.Get("t", req) + require.Nil(err) + require.Equal(42, result) + + // Get, should not fetch + req = TestRequest(t, RequestInfo{Key: "hello"}) + result, err = c.Get("t", req) + require.Nil(err) + require.Equal(42, result) + + // Sleep for the expiry + time.Sleep(500 * time.Millisecond) + + // Get, should fetch + req = TestRequest(t, RequestInfo{Key: "hello"}) + result, err = c.Get("t", req) + require.Nil(err) + require.Equal(42, result) + + // Sleep a tiny bit just to let maybe some background calls happen + // then verify that we still only got the one call + time.Sleep(20 * time.Millisecond) + typ.AssertExpectations(t) +} + // Test that Get partitions the caches based on DC so two equivalent requests // to different datacenters are automatically cached even if their keys are // the same. diff --git a/agent/cache/entry.go b/agent/cache/entry.go new file mode 100644 index 0000000000..99636be6f9 --- /dev/null +++ b/agent/cache/entry.go @@ -0,0 +1,103 @@ +package cache + +import ( + "sync/atomic" + "time" +) + +// cacheEntry stores a single cache entry. +// +// Note that this isn't a very optimized structure currently. There are +// a lot of improvements that can be made here in the long term. +type cacheEntry struct { + // Fields pertaining to the actual value + Key string + Value interface{} + Error error + Index uint64 + + // Metadata that is used for internal accounting + Valid bool // True if the Value is set + Fetching bool // True if a fetch is already active + Waiter chan struct{} // Closed when this entry is invalidated + + // ExpiresRaw is the time.Time that this value expires. The time.Time + // is immune to wall clock changes since we only use APIs that + // operate on the monotonic value. The value is in an atomic.Value + // so we have an efficient way to "touch" the value while maybe being + // read without introducing complex locking. + ExpiresRaw atomic.Value + ExpiresTTL time.Duration + ExpiryHeapIndex *int +} + +// Expires is the time that this entry expires. The time.Time value returned +// has the monotonic clock preserved and should be used only with +// monotonic-safe operations to prevent wall clock changes affecting +// cache behavior. +func (e *cacheEntry) Expires() time.Time { + return e.ExpiresRaw.Load().(time.Time) +} + +// ResetExpires resets the expiration to be the ttl duration from now. +func (e *cacheEntry) ResetExpires() { + e.ExpiresRaw.Store(time.Now().Add(e.ExpiresTTL)) +} + +// expiryHeap is a heap implementation that stores information about +// when entires expire. Implements container/heap.Interface. +// +// All operations on the heap and read/write of the heap contents require +// the proper entriesLock to be held on Cache. +type expiryHeap struct { + Entries []*cacheEntry + + // NotifyCh is sent a value whenever the 0 index value of the heap + // changes. This can be used to detect when the earliest value + // changes. + NotifyCh chan struct{} +} + +func (h *expiryHeap) Len() int { return len(h.Entries) } + +func (h *expiryHeap) Swap(i, j int) { + h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i] + *h.Entries[i].ExpiryHeapIndex = i + *h.Entries[j].ExpiryHeapIndex = j + + // If we're moving the 0 index, update the channel since we need + // to re-update the timer we're waiting on for the soonest expiring + // value. + if i == 0 || j == 0 { + h.NotifyCh <- struct{}{} + } +} + +func (h *expiryHeap) Less(i, j int) bool { + // The usage of Before here is important (despite being obvious): + // this function uses the monotonic time that should be available + // on the time.Time value so the heap is immune to wall clock changes. + return h.Entries[i].Expires().Before(h.Entries[j].Expires()) +} + +func (h *expiryHeap) Push(x interface{}) { + entry := x.(*cacheEntry) + + // For the first entry, we need to trigger a channel send because + // Swap won't be called; nothing to swap! We can call it right away + // because all heap operations are within a lock. + if len(h.Entries) == 0 { + *entry.ExpiryHeapIndex = 0 // Set correct initial index + h.NotifyCh <- struct{}{} + } + + h.Entries = append(h.Entries, entry) +} + +func (h *expiryHeap) Pop() interface{} { + old := h.Entries + n := len(old) + x := old[n-1] + h.Entries = old[0 : n-1] + return x +} diff --git a/agent/cache/entry_test.go b/agent/cache/entry_test.go new file mode 100644 index 0000000000..0ebf0682d6 --- /dev/null +++ b/agent/cache/entry_test.go @@ -0,0 +1,10 @@ +package cache + +import ( + "container/heap" + "testing" +) + +func TestExpiryHeap_impl(t *testing.T) { + var _ heap.Interface = new(expiryHeap) +}