diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 361f29f4d3..03b9df8b9b 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -87,7 +87,7 @@ type Cache struct { // internal storage format so changing this should be possible safely. entriesLock sync.RWMutex entries map[string]cacheEntry - entriesExpiryHeap *expiryHeap + entriesExpiryHeap *ExpiryHeap // stopped is used as an atomic flag to signal that the Cache has been // discarded so background fetches and expiry processing should stop. @@ -169,7 +169,7 @@ func New(options Options) *Cache { c := &Cache{ types: make(map[string]typeEntry), entries: make(map[string]cacheEntry), - entriesExpiryHeap: newExpiryHeap(), + entriesExpiryHeap: NewExpiryHeap(), stopCh: make(chan struct{}), options: options, rateLimitContext: ctx, @@ -803,7 +803,7 @@ func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) erro Index: res.Index, FetchedAt: time.Now(), Waiter: make(chan struct{}), - Expiry: &cacheEntryExpiry{Key: key}, + Expiry: &CacheEntryExpiry{Key: key}, FetchRateLimiter: rate.NewLimiter( c.options.EntryFetchRate, c.options.EntryFetchMaxBurst, diff --git a/agent/cache/entry.go b/agent/cache/entry.go index 9d7c343d70..fca1f20d1b 100644 --- a/agent/cache/entry.go +++ b/agent/cache/entry.go @@ -30,8 +30,8 @@ type cacheEntry struct { // Expiry contains information about the expiration of this // entry. This is a pointer as its shared as a value in the - // expiryHeap as well. - Expiry *cacheEntryExpiry + // ExpiryHeap as well. + Expiry *CacheEntryExpiry // FetchedAt stores the time the cache entry was retrieved for determining // it's age later. diff --git a/agent/cache/eviction.go b/agent/cache/eviction.go index efd1025025..579f13fefe 100644 --- a/agent/cache/eviction.go +++ b/agent/cache/eviction.go @@ -5,20 +5,20 @@ import ( "time" ) -// cacheEntryExpiry contains the expiration time for a cache entry. -type cacheEntryExpiry struct { +// CacheEntryExpiry contains the expiration time for a cache entry. +type CacheEntryExpiry struct { Key string // Key in the cache map Expires time.Time // Time when entry expires (monotonic clock) HeapIndex int // Index in the heap } -// expiryHeap is a container/heap.Interface implementation that expires entries +// ExpiryHeap is a container/heap.Interface implementation that expires entries // in the cache when their expiration time is reached. // // All operations on the heap and read/write of the heap contents require // the proper entriesLock to be held on Cache. -type expiryHeap struct { - Entries []*cacheEntryExpiry +type ExpiryHeap struct { + entries []*CacheEntryExpiry // NotifyCh is sent a value whenever the 0 index value of the heap // changes. This can be used to detect when the earliest value @@ -29,25 +29,25 @@ type expiryHeap struct { // Initialize the heap. The buffer of 1 is really important because // its possible for the expiry loop to trigger the heap to update // itself and it'd block forever otherwise. -func newExpiryHeap() *expiryHeap { - h := &expiryHeap{NotifyCh: make(chan struct{}, 1)} - heap.Init(h) +func NewExpiryHeap() *ExpiryHeap { + h := &ExpiryHeap{NotifyCh: make(chan struct{}, 1)} + heap.Init((*entryHeap)(h)) return h } // Add an entry to the heap. // // Must be synchronized by the caller. -func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry { - entry := &cacheEntryExpiry{ +func (h *ExpiryHeap) Add(key string, expiry time.Duration) *CacheEntryExpiry { + entry := &CacheEntryExpiry{ Key: key, Expires: time.Now().Add(expiry), // Set the initial heap index to the last index. If the entry is swapped it // will have the correct index set, and if it remains at the end the last // index will be correct. - HeapIndex: len(h.Entries), + HeapIndex: len(h.entries), } - heap.Push(h, entry) + heap.Push((*entryHeap)(h), entry) if entry.HeapIndex == 0 { h.notify() } @@ -58,10 +58,10 @@ func (h *expiryHeap) Add(key string, expiry time.Duration) *cacheEntryExpiry { // will be rebalanced after the entry is updated. // // Must be synchronized by the caller. -func (h *expiryHeap) Update(idx int, expiry time.Duration) { - entry := h.Entries[idx] +func (h *ExpiryHeap) Update(idx int, expiry time.Duration) { + entry := h.entries[idx] entry.Expires = time.Now().Add(expiry) - heap.Fix(h, idx) + heap.Fix((*entryHeap)(h), idx) // If the previous index and current index are both zero then Fix did not // swap the entry, and notify must be called here. @@ -71,9 +71,9 @@ func (h *expiryHeap) Update(idx int, expiry time.Duration) { } // Must be synchronized by the caller. -func (h *expiryHeap) Remove(idx int) { - entry := h.Entries[idx] - heap.Remove(h, idx) +func (h *ExpiryHeap) Remove(idx int) { + entry := h.entries[idx] + heap.Remove((*entryHeap)(h), idx) // A goroutine which is fetching a new value will have a reference to this // entry. When it re-acquires the lock it needs to be informed that @@ -86,38 +86,40 @@ func (h *expiryHeap) Remove(idx int) { } } -func (h *expiryHeap) Len() int { return len(h.Entries) } +type entryHeap ExpiryHeap -func (h *expiryHeap) Swap(i, j int) { - h.Entries[i], h.Entries[j] = h.Entries[j], h.Entries[i] - h.Entries[i].HeapIndex = i - h.Entries[j].HeapIndex = j +func (h *entryHeap) Len() int { return len(h.entries) } + +func (h *entryHeap) Swap(i, j int) { + h.entries[i], h.entries[j] = h.entries[j], h.entries[i] + h.entries[i].HeapIndex = i + h.entries[j].HeapIndex = j } -func (h *expiryHeap) Less(i, j int) bool { +func (h *entryHeap) Less(i, j int) bool { // The usage of Before here is important (despite being obvious): // this function uses the monotonic time that should be available // on the time.Time value so the heap is immune to wall clock changes. - return h.Entries[i].Expires.Before(h.Entries[j].Expires) + return h.entries[i].Expires.Before(h.entries[j].Expires) } // heap.Interface, this isn't expected to be called directly. -func (h *expiryHeap) Push(x interface{}) { - h.Entries = append(h.Entries, x.(*cacheEntryExpiry)) +func (h *entryHeap) Push(x interface{}) { + h.entries = append(h.entries, x.(*CacheEntryExpiry)) } // heap.Interface, this isn't expected to be called directly. -func (h *expiryHeap) Pop() interface{} { - n := len(h.Entries) - entries := h.Entries +func (h *entryHeap) Pop() interface{} { + n := len(h.entries) + entries := h.entries last := entries[n-1] - h.Entries = entries[0 : n-1] + h.entries = entries[0 : n-1] return last } // notify the timer that the head value has changed, so the expiry time has // also likely changed. -func (h *expiryHeap) notify() { +func (h *ExpiryHeap) notify() { // Send to channel without blocking. Skips sending if there is already // an item in the buffered channel. select { @@ -127,30 +129,30 @@ func (h *expiryHeap) notify() { } // Must be synchronized by the caller. -func (h *expiryHeap) Next() timer { - if len(h.Entries) == 0 { - return timer{} +func (h *ExpiryHeap) Next() Timer { + if len(h.entries) == 0 { + return Timer{} } - entry := h.Entries[0] - return timer{ + entry := h.entries[0] + return Timer{ timer: time.NewTimer(time.Until(entry.Expires)), Entry: entry, } } -type timer struct { +type Timer struct { timer *time.Timer - Entry *cacheEntryExpiry + Entry *CacheEntryExpiry } -func (t *timer) Wait() <-chan time.Time { +func (t *Timer) Wait() <-chan time.Time { if t.timer == nil { return nil } return t.timer.C } -func (t *timer) Stop() { +func (t *Timer) Stop() { if t.timer != nil { t.timer.Stop() } diff --git a/agent/cache/eviction_test.go b/agent/cache/eviction_test.go index 0b200c2dd6..1612b29472 100644 --- a/agent/cache/eviction_test.go +++ b/agent/cache/eviction_test.go @@ -6,46 +6,43 @@ import ( "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) -var _ heap.Interface = new(expiryHeap) +var _ heap.Interface = (*entryHeap)(nil) func TestExpiryHeap(t *testing.T) { - require := require.New(t) - ch := make(chan struct{}, 10) // buffered to prevent blocking in tests - h := &expiryHeap{NotifyCh: ch} - var entry, entry2, entry3 *cacheEntryExpiry + h := NewExpiryHeap() + ch := h.NotifyCh + var entry, entry2, entry3 *CacheEntryExpiry // Init, shouldn't trigger anything - heap.Init(h) testNoMessage(t, ch) runStep(t, "add an entry", func(t *testing.T) { entry = h.Add("foo", 100*time.Millisecond) - require.Equal(0, entry.HeapIndex) + assert.Equal(t, 0, entry.HeapIndex) testMessage(t, ch) testNoMessage(t, ch) // exactly one asserted above }) runStep(t, "add a second entry in front", func(t *testing.T) { entry2 = h.Add("bar", 50*time.Millisecond) - require.Equal(0, entry2.HeapIndex) - require.Equal(1, entry.HeapIndex) + assert.Equal(t, 0, entry2.HeapIndex) + assert.Equal(t, 1, entry.HeapIndex) testMessage(t, ch) testNoMessage(t, ch) // exactly one asserted above }) runStep(t, "add a third entry at the end", func(t *testing.T) { entry3 = h.Add("baz", 1000*time.Millisecond) - require.Equal(2, entry3.HeapIndex) + assert.Equal(t, 2, entry3.HeapIndex) testNoMessage(t, ch) // no notify cause index 0 stayed the same }) runStep(t, "remove the first entry", func(t *testing.T) { h.Remove(0) - require.Equal(0, entry.HeapIndex) - require.Equal(1, entry3.HeapIndex) + assert.Equal(t, 0, entry.HeapIndex) + assert.Equal(t, 1, entry3.HeapIndex) testMessage(t, ch) testNoMessage(t, ch) }) @@ -60,8 +57,8 @@ func TestExpiryHeap(t *testing.T) { runStep(t, "0th element change triggers a notify", func(t *testing.T) { h.Update(entry3.HeapIndex, 20) - require.Equal(1, entry.HeapIndex) // no move - require.Equal(0, entry3.HeapIndex) + assert.Equal(t, 1, entry.HeapIndex) // no move + assert.Equal(t, 0, entry3.HeapIndex) testMessage(t, ch) testNoMessage(t, ch) // one message }) @@ -97,7 +94,7 @@ func TestExpiryLoop_ExitsWhenStopped(t *testing.T) { c := &Cache{ stopCh: make(chan struct{}), entries: make(map[string]cacheEntry), - entriesExpiryHeap: newExpiryHeap(), + entriesExpiryHeap: NewExpiryHeap(), } chStart := make(chan struct{}) chDone := make(chan struct{})