Add cache.Notify to abstract watching for cache updates for types that support blocking semantics. (#4695)

This commit is contained in:
Paul Banks 2018-10-02 11:27:10 +01:00
parent e812f5516a
commit 96b9b95a19
4 changed files with 413 additions and 26 deletions

71
agent/cache/cache.go vendored
View File

@ -107,6 +107,12 @@ type ResultMeta struct {
// For simple cache types, Age is the time since the result being returned was
// fetched from the servers.
Age time.Duration
// Index is the internal ModifyIndex for the cache entry. Not all types
// support blocking and all that do will likely have this in their result type
// already but this allows generic code to reason about whether cache values
// have changed.
Index uint64
}
// Options are options for the Cache.
@ -204,13 +210,20 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
// error is returned on timeout. This matches the behavior of Consul blocking
// queries.
func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
return c.getWithIndex(t, r, r.CacheInfo().MinIndex)
}
// getWithIndex implements the main Get functionality but allows internal
// callers (Watch) to manipulate the blocking index separately from the actual
// request object.
func (c *Cache) getWithIndex(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
info := r.CacheInfo()
if info.Key == "" {
metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1)
// If no key is specified, then we do not cache this request.
// Pass directly through to the backend.
return c.fetchDirect(t, r)
return c.fetchDirect(t, r, minIndex)
}
// Get the actual key for our entry
@ -223,11 +236,6 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
var timeoutCh <-chan time.Time
RETRY_GET:
// Get the current value
c.entriesLock.RLock()
entry, ok := c.entries[key]
c.entriesLock.RUnlock()
// Get the type that we're fetching
c.typesLock.RLock()
tEntry, ok := c.types[t]
@ -238,6 +246,11 @@ RETRY_GET:
return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t)
}
// Get the current value
c.entriesLock.RLock()
entry, ok := c.entries[key]
c.entriesLock.RUnlock()
// Check if we have a hit
cacheHit := ok && entry.Valid
@ -246,7 +259,7 @@ RETRY_GET:
// Check index is not specified or lower than value, or the type doesn't
// support blocking.
if cacheHit && supportsBlocking &&
info.MinIndex > 0 && info.MinIndex >= entry.Index {
minIndex > 0 && minIndex >= entry.Index {
// MinIndex was given and matches or is higher than current value so we
// ignore the cache and fallthrough to blocking on a new value below.
cacheHit = false
@ -266,7 +279,7 @@ RETRY_GET:
}
if cacheHit {
meta := ResultMeta{}
meta := ResultMeta{Index: entry.Index}
if first {
metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1)
meta.Hit = true
@ -306,14 +319,14 @@ RETRY_GET:
// timeout. Instead, we make one effort to fetch a new value, and if
// there was an error, we return.
if !first && entry.Error != nil {
return entry.Value, ResultMeta{}, entry.Error
return entry.Value, ResultMeta{Index: entry.Index}, entry.Error
}
if first {
// We increment two different counters for cache misses depending on
// whether we're missing because we didn't have the data at all,
// or if we're missing because we're blocking on a set index.
if info.MinIndex == 0 {
if minIndex == 0 {
metrics.IncrCounter([]string{"consul", "cache", t, "miss_new"}, 1)
} else {
metrics.IncrCounter([]string{"consul", "cache", t, "miss_block"}, 1)
@ -332,17 +345,17 @@ RETRY_GET:
// value we have is too old. We need to wait for new data.
waiterCh, err := c.fetch(t, key, r, true, 0)
if err != nil {
return nil, ResultMeta{}, err
return nil, ResultMeta{Index: entry.Index}, err
}
select {
case <-waiterCh:
// Our fetch returned, retry the get from the cache
// Our fetch returned, retry the get from the cache.
goto RETRY_GET
case <-timeoutCh:
// Timeout on the cache read, just return whatever we have.
return entry.Value, ResultMeta{}, nil
return entry.Value, ResultMeta{Index: entry.Index}, nil
}
}
@ -552,7 +565,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<-
// fetchDirect fetches the given request with no caching. Because this
// bypasses the caching entirely, multiple matching requests will result
// in multiple actual RPC calls (unlike fetch).
func (c *Cache) fetchDirect(t string, r Request) (interface{}, ResultMeta, error) {
func (c *Cache) fetchDirect(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
// Get the type that we're fetching
c.typesLock.RLock()
tEntry, ok := c.types[t]
@ -563,7 +576,7 @@ func (c *Cache) fetchDirect(t string, r Request) (interface{}, ResultMeta, error
// Fetch it with the min index specified directly by the request.
result, err := tEntry.Type.Fetch(FetchOptions{
MinIndex: r.CacheInfo().MinIndex,
MinIndex: minIndex,
}, r)
if err != nil {
return nil, ResultMeta{}, err
@ -573,6 +586,21 @@ func (c *Cache) fetchDirect(t string, r Request) (interface{}, ResultMeta, error
return result.Value, ResultMeta{}, nil
}
func backOffWait(failures uint) time.Duration {
if failures > CacheRefreshBackoffMin {
shift := failures - CacheRefreshBackoffMin
waitTime := CacheRefreshMaxWait
if shift < 31 {
waitTime = (1 << shift) * time.Second
}
if waitTime > CacheRefreshMaxWait {
waitTime = CacheRefreshMaxWait
}
return waitTime
}
return 0
}
// refresh triggers a fetch for a specific Request according to the
// registration options.
func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key string, r Request) {
@ -586,17 +614,8 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin
}
// If we're over the attempt minimum, start an exponential backoff.
if attempt > CacheRefreshBackoffMin {
shift := attempt - CacheRefreshBackoffMin
waitTime := CacheRefreshMaxWait
if shift < 31 {
waitTime = (1 << shift) * time.Second
}
if waitTime > CacheRefreshMaxWait {
waitTime = CacheRefreshMaxWait
}
time.Sleep(waitTime)
if wait := backOffWait(attempt); wait > 0 {
time.Sleep(wait)
}
// If we have a timer, wait for it

View File

@ -6,6 +6,7 @@ import (
"github.com/mitchellh/go-testing-interface"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
// TestCache returns a Cache instance configuring for testing.
@ -50,6 +51,43 @@ func TestCacheGetChResult(t testing.T, ch <-chan interface{}, expected interface
}
}
// TestCacheNotifyChResult tests that the expected updated was delivered on a
// Notify() chan within a reasonable period of time (it expects it to be
// "immediate" but waits some milliseconds). Expected may be given multiple
// times and if so these are all waited for and asserted to match but IN ANY
// ORDER to ensure we aren't timing dependent.
func TestCacheNotifyChResult(t testing.T, ch <-chan UpdateEvent, expected ...UpdateEvent) {
t.Helper()
expectLen := len(expected)
if expectLen < 1 {
panic("asserting nothing")
}
got := make([]UpdateEvent, 0, expectLen)
timeoutCh := time.After(50 * time.Millisecond)
OUT:
for {
select {
case result := <-ch:
// Ignore age as it's non-deterministic
result.Meta.Age = 0
got = append(got, result)
if len(got) == expectLen {
break OUT
}
case <-timeoutCh:
t.Fatalf("got %d results on chan in 50ms, want %d", len(got), expectLen)
}
}
// We already asserted len since you can only get here if we appended enough.
// Just check all the results we got are in the expected slice
require.ElementsMatch(t, expected, got)
}
// TestRequest returns a Request that returns the given cache key and index.
// The Reset method can be called to reset it for custom usage.
func TestRequest(t testing.T, info RequestInfo) *MockRequest {

122
agent/cache/watch.go vendored Normal file
View File

@ -0,0 +1,122 @@
package cache
import (
"context"
"fmt"
"time"
)
// UpdateEvent is a struct summarising an update to a cache entry
type UpdateEvent struct {
// CorrelationID is used by the Notify API to allow correlation of updates
// with specific requests. We could return the full request object and
// cachetype for consumers to match against the calls they made but in
// practice it's cleaner for them to choose the minimal necessary unique
// identifier given the set of things they are watching. They might even
// choose to assign random IDs for example.
CorrelationID string
Result interface{}
Meta ResultMeta
Err error
}
// Notify registers a desire to be updated about changes to a cache result.
//
// It is a helper that abstracts code from perfroming their own "blocking" query
// logic against a cache key to watch for changes and to maintain the key in
// cache actively. It will continue to perform blocking Get requests until the
// context is canceled.
//
// The passed context must be cancelled or timeout in order to free resources
// and stop maintaining the value in cache. Typically request-scoped resources
// do this but if a long-lived context like context.Background is used, then the
// caller must arrange for it to be cancelled when the watch is no longer
// needed.
//
// The passed chan may be buffered or unbuffered, if the caller doesn't consume
// fast enough it will block the notification loop. When the chan is later
// drained, watching resumes correctly. If the pause is longer than the
// cachetype's TTL, the result might be removed from the local cache. Even in
// this case though when the chan is drained again, the new Get will re-fetch
// the entry from servers and resume notification behaviour transparently.
//
// The chan is passed in to allow multiple cached results to be watched by a
// single consumer without juggling extra goroutines per watch. The
// correlationID is opaque and will be returned in all UpdateEvents generated by
// result of watching the specified request so the caller can set this to any
// value that allows them to dissambiguate between events in the returned chan
// when sharing a chan between multiple cache entries. If the chan is closed,
// the notify loop will terminate.
func (c *Cache) Notify(ctx context.Context, t string, r Request,
correlationID string, ch chan<- UpdateEvent) error {
// Get the type that we're fetching
c.typesLock.RLock()
tEntry, ok := c.types[t]
c.typesLock.RUnlock()
if !ok {
return fmt.Errorf("unknown type in cache: %s", t)
}
if !tEntry.Type.SupportsBlocking() {
return fmt.Errorf("watch requires the type to support blocking")
}
// Always start at 0 index to deliver the inital (possibly currently cached
// value).
index := uint64(0)
go func() {
var failures uint
for {
// Check context hasn't been cancelled
if ctx.Err() != nil {
return
}
// Blocking request
res, meta, err := c.getWithIndex(t, r, index)
// Check context hasn't been cancelled
if ctx.Err() != nil {
return
}
// Check the index of the value returned in the cache entry to be sure it
// changed
if index < meta.Index {
u := UpdateEvent{correlationID, res, meta, err}
select {
case ch <- u:
case <-ctx.Done():
return
}
// Update index for next request
index = meta.Index
}
// Handle errors with backoff. Badly behaved blocking calls that returned
// a zero index are considered as failures since we need to not get stuck
// in a busy loop.
if err == nil && meta.Index > 0 {
failures = 0
} else {
failures++
}
if wait := backOffWait(failures); wait > 0 {
select {
case <-time.After(wait):
case <-ctx.Done():
return
}
}
// Sanity check we always request blocking on second pass
if index < 1 {
index = 1
}
}
}()
return nil
}

208
agent/cache/watch_test.go vendored Normal file
View File

@ -0,0 +1,208 @@
package cache
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)
// Test that a type registered with a periodic refresh can be watched.
func TestCacheNotify(t *testing.T) {
t.Parallel()
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
Refresh: false,
})
// Setup triggers to control when "updates" should be delivered
trigger := make([]chan time.Time, 4)
for i := range trigger {
trigger[i] = make(chan time.Time)
}
// Configure the type
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once().Run(func(args mock.Arguments) {
// Assert the right request type - all real Fetch implementations do this so
// it keeps us honest that Watch doesn't require type mangling which will
// break in real life (hint: it did on the first attempt)
_, ok := args.Get(1).(*MockRequest)
require.True(t, ok)
})
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[0])
typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[1])
typ.Static(FetchResult{Value: 42, Index: 7}, nil).Once().WaitUntil(trigger[2])
// It's timing dependent whether the blocking loop manages to make another
// call before we cancel so don't require it. We need to have a higher index
// here because if the index is the same then the cache Get will not return
// until the full 10 min timeout expires. This causes the last fetch to return
// after cancellation as if it had timed out.
typ.Static(FetchResult{Value: 42, Index: 8}, nil).WaitUntil(trigger[3])
require := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan UpdateEvent)
err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test", ch)
require.NoError(err)
// Should receive the first result pretty soon
TestCacheNotifyChResult(t, ch, UpdateEvent{
CorrelationID: "test",
Result: 1,
Meta: ResultMeta{Hit: false, Index: 4},
Err: nil,
})
// There should be no more updates delivered yet
require.Len(ch, 0)
// Trigger blocking query to return a "change"
close(trigger[0])
// Should receive the next result pretty soon
TestCacheNotifyChResult(t, ch, UpdateEvent{
CorrelationID: "test",
Result: 12,
// Note these are never cache "hits" because blocking will wait until there
// is a new value at which point it's not considered a hit.
Meta: ResultMeta{Hit: false, Index: 5},
Err: nil,
})
// Registere a second observer using same chan and request. Note that this is
// testing a few things implicitly:
// - that multiple watchers on the same cache entity are de-duped in their
// requests to the "backend"
// - that multiple watchers can distinguish their results using correlationID
err = c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test2", ch)
require.NoError(err)
// Should get test2 notify immediately, and it should be a cache hit
TestCacheNotifyChResult(t, ch, UpdateEvent{
CorrelationID: "test2",
Result: 12,
Meta: ResultMeta{Hit: true, Index: 5},
Err: nil,
})
// We could wait for a full timeout but we can't directly observe it so
// simulate the behaviour by triggering a response with the same value and
// index as the last one.
close(trigger[1])
// We should NOT be notified about that. Note this is timing dependent but
// it's only a sanity check, if we somehow _do_ get the change delivered later
// than 10ms the next value assertion will fail anyway.
time.Sleep(10 * time.Millisecond)
require.Len(ch, 0)
// Trigger final update
close(trigger[2])
TestCacheNotifyChResult(t, ch, UpdateEvent{
CorrelationID: "test",
Result: 42,
Meta: ResultMeta{Hit: false, Index: 7},
Err: nil,
}, UpdateEvent{
CorrelationID: "test2",
Result: 42,
Meta: ResultMeta{Hit: false, Index: 7},
Err: nil,
})
// Sanity check closing chan before context is cancelled doesn't panic
//close(ch)
// Close context
cancel()
// It's likely but not certain that at least one of the watchers was blocked
// on the next cache Get so trigger that to timeout so we can observe the
// watch goroutines being cleaned up. This is necessary since currently we
// have no way to interrupt a blocking query. In practice it's fine to know
// that after 10 mins max the blocking query will return and the resources
// will be cleaned.
close(trigger[3])
// I want to test that cancelling the context cleans up goroutines (which it
// does from manual verification with debugger etc). I had a check based on a
// similar approach to https://golang.org/src/net/http/main_test.go#L60 but it
// was just too flaky because it relies on the timing of the error backoff
// timer goroutines and similar so I've given up for now as I have more
// important things to get working.
}
// Test that a refresh performs a backoff.
func TestCacheWatch_ErrorBackoff(t *testing.T) {
t.Parallel()
typ := TestType(t)
defer typ.AssertExpectations(t)
c := TestCache(t)
c.RegisterType("t", typ, &RegisterOptions{
Refresh: false,
})
// Configure the type
var retries uint32
fetchErr := fmt.Errorf("test fetch error")
typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once()
typ.Static(FetchResult{Value: nil, Index: 5}, fetchErr).Run(func(args mock.Arguments) {
atomic.AddUint32(&retries, 1)
})
require := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan UpdateEvent)
err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test", ch)
require.NoError(err)
// Should receive the first result pretty soon
TestCacheNotifyChResult(t, ch, UpdateEvent{
CorrelationID: "test",
Result: 1,
Meta: ResultMeta{Hit: false, Index: 4},
Err: nil,
})
numErrors := 0
// Loop for a little while and count how many errors we see reported. If this
// was running as fast as it could go we'd expect this to be huge. We have to
// be a little careful here because the watch chan ch doesn't have a large
// buffer so we could be artificially slowing down the loop without the
// backoff actualy taking affect. We can validate that by ensuring this test
// fails without the backoff code reliably.
timeoutC := time.After(500 * time.Millisecond)
OUT:
for {
select {
case <-timeoutC:
break OUT
case u := <-ch:
numErrors++
require.Error(u.Err)
}
}
// Must be fewer than 10 failures in that time
require.True(numErrors < 10, fmt.Sprintf("numErrors: %d", numErrors))
// Check the number of RPCs as a sanity check too
actual := atomic.LoadUint32(&retries)
require.True(actual < 10, fmt.Sprintf("actual: %d", actual))
}