agent/cache: initial TTL work

This commit is contained in:
Mitchell Hashimoto 2018-04-19 17:31:50 -07:00
parent 1df99514ca
commit 595193a781
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
4 changed files with 280 additions and 33 deletions

147
agent/cache/cache.go vendored
View File

@ -15,6 +15,7 @@
package cache
import (
"container/heap"
"fmt"
"sync"
"sync/atomic"
@ -54,7 +55,11 @@ type Cache struct {
typesLock sync.RWMutex
types map[string]typeEntry
// entries contains the actual cache data.
// entries contains the actual cache data. Access to entries and
// entriesExpiryHeap must be protected by entriesLock.
//
// entriesExpiryHeap is a heap of *cacheEntry values ordered by
// expiry, with the soonest to expire being first in the list (index 0).
//
// NOTE(mitchellh): The entry map key is currently a string in the format
// of "<DC>/<ACL token>/<Request key>" in order to properly partition
@ -64,19 +69,7 @@ type Cache struct {
// internal storage format so changing this should be possible safely.
entriesLock sync.RWMutex
entries map[string]cacheEntry
}
// cacheEntry stores a single cache entry.
type cacheEntry struct {
// Fields pertaining to the actual value
Value interface{}
Error error
Index uint64
// Metadata that is used for internal accounting
Valid bool
Fetching bool
Waiter chan struct{}
entriesExpiryHeap *expiryHeap
}
// typeEntry is a single type that is registered with a Cache.
@ -93,16 +86,34 @@ type Options struct {
// New creates a new cache with the given RPC client and reasonable defaults.
// Further settings can be tweaked on the returned value.
func New(*Options) *Cache {
return &Cache{
entries: make(map[string]cacheEntry),
// Initialize the heap. The buffer of 1 is really important because
// its possible for the expiry loop to trigger the heap to update
// itself and it'd block forever otherwise.
h := &expiryHeap{NotifyCh: make(chan struct{}, 1)}
heap.Init(h)
c := &Cache{
types: make(map[string]typeEntry),
entries: make(map[string]cacheEntry),
entriesExpiryHeap: h,
}
// Start the expiry watcher
go c.runExpiryLoop()
return c
}
// RegisterOptions are options that can be associated with a type being
// registered for the cache. This changes the behavior of the cache for
// this type.
type RegisterOptions struct {
// LastGetTTL is the time that the values returned by this type remain
// in the cache after the last get operation. If a value isn't accessed
// within this duration, the value is purged from the cache and
// background refreshing will cease.
LastGetTTL time.Duration
// Refresh configures whether the data is actively refreshed or if
// the data is only refreshed on an explicit Get. The default (false)
// is to only request data on explicit Get.
@ -137,6 +148,9 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
if opts == nil {
opts = &RegisterOptions{}
}
if opts.LastGetTTL == 0 {
opts.LastGetTTL = 72 * time.Hour // reasonable default is days
}
c.typesLock.Lock()
defer c.typesLock.Unlock()
@ -193,6 +207,12 @@ RETRY_GET:
atomic.AddUint64(&c.hits, 1)
}
// Touch the expiration and fix the heap
entry.ResetExpires()
c.entriesLock.Lock()
heap.Fix(c.entriesExpiryHeap, *entry.ExpiryHeapIndex)
c.entriesLock.Unlock()
return entry.Value, entry.Error
}
}
@ -230,7 +250,7 @@ RETRY_GET:
// At this point, we know we either don't have a value at all or the
// value we have is too old. We need to wait for new data.
waiterCh, err := c.fetch(t, key, r)
waiterCh, err := c.fetch(t, key, r, true)
if err != nil {
return nil, err
}
@ -256,7 +276,11 @@ func (c *Cache) entryKey(r *RequestInfo) string {
// background fetch is already running for a matching Request, the waiter
// channel for that request is returned. The effect of this is that there
// is only ever one blocking query for any matching requests.
func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) {
//
// If allowNew is true then the fetch should create the cache entry
// if it doesn't exist. If this is false, then fetch will do nothing
// if the entry doesn't exist. This latter case is to support refreshing.
func (c *Cache) fetch(t, key string, r Request, allowNew bool) (<-chan struct{}, error) {
// Get the type that we're fetching
c.typesLock.RLock()
tEntry, ok := c.types[t]
@ -270,6 +294,15 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) {
defer c.entriesLock.Unlock()
entry, ok := c.entries[key]
// If we aren't allowing new values and we don't have an existing value,
// return immediately. We return an immediately-closed channel so nothing
// blocks.
if !ok && !allowNew {
ch := make(chan struct{})
close(ch)
return ch, nil
}
// If we already have an entry and it is actively fetching, then return
// the currently active waiter.
if ok && entry.Fetching {
@ -305,14 +338,10 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) {
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1)
}
var newEntry cacheEntry
if result.Value == nil {
// If no value was set, then we do not change the prior entry.
// Instead, we just update the waiter to be new so that another
// Get will wait on the correct value.
newEntry = entry
// Copy the existing entry to start.
newEntry := entry
newEntry.Fetching = false
} else {
if result.Value != nil {
// A new value was given, so we create a brand new entry.
newEntry.Value = result.Value
newEntry.Index = result.Index
@ -331,12 +360,33 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) {
// Create a new waiter that will be used for the next fetch.
newEntry.Waiter = make(chan struct{})
// Insert
// The key needs to always be set since this is used by the
// expiration loop to know what entry to delete.
newEntry.Key = key
// If this is a new entry (not in the heap yet), then set the
// initial expiration TTL.
if newEntry.ExpiryHeapIndex == nil {
newEntry.ExpiresTTL = tEntry.Opts.LastGetTTL
newEntry.ResetExpires()
}
// Set our entry
c.entriesLock.Lock()
if newEntry.ExpiryHeapIndex != nil {
// If we're already in the heap, just change the value in-place.
// We don't need to call heap.Fix because the expiry doesn't
// change.
c.entriesExpiryHeap.Entries[*newEntry.ExpiryHeapIndex] = &newEntry
} else {
// Add the new value
newEntry.ExpiryHeapIndex = new(int)
heap.Push(c.entriesExpiryHeap, &newEntry)
}
c.entries[key] = newEntry
c.entriesLock.Unlock()
// Trigger the waiter
// Trigger the old waiter
close(entry.Waiter)
// If refresh is enabled, run the refresh in due time. The refresh
@ -386,8 +436,47 @@ func (c *Cache) refresh(opts *RegisterOptions, t string, key string, r Request)
time.Sleep(opts.RefreshTimer)
}
// Trigger
c.fetch(t, key, r)
// Trigger. The "allowNew" field is false because in the time we were
// waiting to refresh we may have expired and got evicted. If that
// happened, we don't want to create a new entry.
c.fetch(t, key, r, false)
}
// runExpiryLoop is a blocking function that watches the expiration
// heap and invalidates entries that have expired.
func (c *Cache) runExpiryLoop() {
var expiryTimer *time.Timer
for {
// If we have a previous timer, stop it.
if expiryTimer != nil {
expiryTimer.Stop()
}
// Get the entry expiring soonest
var entry *cacheEntry
var expiryCh <-chan time.Time
c.entriesLock.RLock()
if len(c.entriesExpiryHeap.Entries) > 0 {
entry = c.entriesExpiryHeap.Entries[0]
expiryTimer = time.NewTimer(entry.Expires().Sub(time.Now()))
expiryCh = expiryTimer.C
}
c.entriesLock.RUnlock()
select {
case <-c.entriesExpiryHeap.NotifyCh:
// Entries changed, so the heap may have changed. Restart loop.
case <-expiryCh:
// Entry expired! Remove it.
c.entriesLock.Lock()
delete(c.entries, entry.Key)
heap.Remove(c.entriesExpiryHeap, *entry.ExpiryHeapIndex)
c.entriesLock.Unlock()
metrics.IncrCounter([]string{"consul", "cache", "evict_expired"}, 1)
}
}
}
// Returns the number of cache hits. Safe to call concurrently.

View File

@ -369,6 +369,51 @@ func TestCacheGet_fetchTimeout(t *testing.T) {
require.Equal(timeout, actual)
}
// Test that entries expire
func TestCacheGet_expire(t *testing.T) {
t.Parallel()
require := require.New(t)
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
// Register the type with a timeout
c.RegisterType("t", typ, &RegisterOptions{
LastGetTTL: 400 * time.Millisecond,
})
// Configure the type
typ.Static(FetchResult{Value: 42}, nil).Times(2)
// Get, should fetch
req := TestRequest(t, RequestInfo{Key: "hello"})
result, err := c.Get("t", req)
require.Nil(err)
require.Equal(42, result)
// Get, should not fetch
req = TestRequest(t, RequestInfo{Key: "hello"})
result, err = c.Get("t", req)
require.Nil(err)
require.Equal(42, result)
// Sleep for the expiry
time.Sleep(500 * time.Millisecond)
// Get, should fetch
req = TestRequest(t, RequestInfo{Key: "hello"})
result, err = c.Get("t", req)
require.Nil(err)
require.Equal(42, result)
// Sleep a tiny bit just to let maybe some background calls happen
// then verify that we still only got the one call
time.Sleep(20 * time.Millisecond)
typ.AssertExpectations(t)
}
// Test that Get partitions the caches based on DC so two equivalent requests
// to different datacenters are automatically cached even if their keys are
// the same.

103
agent/cache/entry.go vendored Normal file
View File

@ -0,0 +1,103 @@
package cache
import (
"sync/atomic"
"time"
)
// cacheEntry stores a single cache entry.
//
// Note that this isn't a very optimized structure currently. There are
// a lot of improvements that can be made here in the long term.
type cacheEntry struct {
// Fields pertaining to the actual value
Key string
Value interface{}
Error error
Index uint64
// Metadata that is used for internal accounting
Valid bool // True if the Value is set
Fetching bool // True if a fetch is already active
Waiter chan struct{} // Closed when this entry is invalidated
// ExpiresRaw is the time.Time that this value expires. The time.Time
// is immune to wall clock changes since we only use APIs that
// operate on the monotonic value. The value is in an atomic.Value
// so we have an efficient way to "touch" the value while maybe being
// read without introducing complex locking.
ExpiresRaw atomic.Value
ExpiresTTL time.Duration
ExpiryHeapIndex *int
}
// Expires is the time that this entry expires. The time.Time value returned
// has the monotonic clock preserved and should be used only with
// monotonic-safe operations to prevent wall clock changes affecting
// cache behavior.
func (e *cacheEntry) Expires() time.Time {
return e.ExpiresRaw.Load().(time.Time)
}
// ResetExpires resets the expiration to be the ttl duration from now.
func (e *cacheEntry) ResetExpires() {
e.ExpiresRaw.Store(time.Now().Add(e.ExpiresTTL))
}
// expiryHeap is a heap implementation that stores information about
// when entires expire. Implements container/heap.Interface.
//
// All operations on the heap and read/write of the heap contents require
// the proper entriesLock to be held on Cache.
type expiryHeap struct {
Entries []*cacheEntry
// NotifyCh is sent a value whenever the 0 index value of the heap
// changes. This can be used to detect when the earliest value
// changes.
NotifyCh chan struct{}
}
func (h *expiryHeap) Len() int { return len(h.Entries) }
func (h *expiryHeap) Swap(i, j int) {
h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i]
*h.Entries[i].ExpiryHeapIndex = i
*h.Entries[j].ExpiryHeapIndex = j
// If we're moving the 0 index, update the channel since we need
// to re-update the timer we're waiting on for the soonest expiring
// value.
if i == 0 || j == 0 {
h.NotifyCh <- struct{}{}
}
}
func (h *expiryHeap) Less(i, j int) bool {
// The usage of Before here is important (despite being obvious):
// this function uses the monotonic time that should be available
// on the time.Time value so the heap is immune to wall clock changes.
return h.Entries[i].Expires().Before(h.Entries[j].Expires())
}
func (h *expiryHeap) Push(x interface{}) {
entry := x.(*cacheEntry)
// For the first entry, we need to trigger a channel send because
// Swap won't be called; nothing to swap! We can call it right away
// because all heap operations are within a lock.
if len(h.Entries) == 0 {
*entry.ExpiryHeapIndex = 0 // Set correct initial index
h.NotifyCh <- struct{}{}
}
h.Entries = append(h.Entries, entry)
}
func (h *expiryHeap) Pop() interface{} {
old := h.Entries
n := len(old)
x := old[n-1]
h.Entries = old[0 : n-1]
return x
}

10
agent/cache/entry_test.go vendored Normal file
View File

@ -0,0 +1,10 @@
package cache
import (
"container/heap"
"testing"
)
func TestExpiryHeap_impl(t *testing.T) {
var _ heap.Interface = new(expiryHeap)
}