diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 881023e3d6..9b91916eb5 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -107,6 +107,12 @@ type ResultMeta struct { // For simple cache types, Age is the time since the result being returned was // fetched from the servers. Age time.Duration + + // Index is the internal ModifyIndex for the cache entry. Not all types + // support blocking and all that do will likely have this in their result type + // already but this allows generic code to reason about whether cache values + // have changed. + Index uint64 } // Options are options for the Cache. @@ -204,13 +210,20 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) { // error is returned on timeout. This matches the behavior of Consul blocking // queries. func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) { + return c.getWithIndex(t, r, r.CacheInfo().MinIndex) +} + +// getWithIndex implements the main Get functionality but allows internal +// callers (Watch) to manipulate the blocking index separately from the actual +// request object. +func (c *Cache) getWithIndex(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) { info := r.CacheInfo() if info.Key == "" { metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1) // If no key is specified, then we do not cache this request. // Pass directly through to the backend. - return c.fetchDirect(t, r) + return c.fetchDirect(t, r, minIndex) } // Get the actual key for our entry @@ -223,11 +236,6 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) { var timeoutCh <-chan time.Time RETRY_GET: - // Get the current value - c.entriesLock.RLock() - entry, ok := c.entries[key] - c.entriesLock.RUnlock() - // Get the type that we're fetching c.typesLock.RLock() tEntry, ok := c.types[t] @@ -238,6 +246,11 @@ RETRY_GET: return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t) } + // Get the current value + c.entriesLock.RLock() + entry, ok := c.entries[key] + c.entriesLock.RUnlock() + // Check if we have a hit cacheHit := ok && entry.Valid @@ -246,7 +259,7 @@ RETRY_GET: // Check index is not specified or lower than value, or the type doesn't // support blocking. if cacheHit && supportsBlocking && - info.MinIndex > 0 && info.MinIndex >= entry.Index { + minIndex > 0 && minIndex >= entry.Index { // MinIndex was given and matches or is higher than current value so we // ignore the cache and fallthrough to blocking on a new value below. cacheHit = false @@ -266,7 +279,7 @@ RETRY_GET: } if cacheHit { - meta := ResultMeta{} + meta := ResultMeta{Index: entry.Index} if first { metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1) meta.Hit = true @@ -306,14 +319,14 @@ RETRY_GET: // timeout. Instead, we make one effort to fetch a new value, and if // there was an error, we return. if !first && entry.Error != nil { - return entry.Value, ResultMeta{}, entry.Error + return entry.Value, ResultMeta{Index: entry.Index}, entry.Error } if first { // We increment two different counters for cache misses depending on // whether we're missing because we didn't have the data at all, // or if we're missing because we're blocking on a set index. - if info.MinIndex == 0 { + if minIndex == 0 { metrics.IncrCounter([]string{"consul", "cache", t, "miss_new"}, 1) } else { metrics.IncrCounter([]string{"consul", "cache", t, "miss_block"}, 1) @@ -332,17 +345,17 @@ RETRY_GET: // value we have is too old. We need to wait for new data. waiterCh, err := c.fetch(t, key, r, true, 0) if err != nil { - return nil, ResultMeta{}, err + return nil, ResultMeta{Index: entry.Index}, err } select { case <-waiterCh: - // Our fetch returned, retry the get from the cache + // Our fetch returned, retry the get from the cache. goto RETRY_GET case <-timeoutCh: // Timeout on the cache read, just return whatever we have. - return entry.Value, ResultMeta{}, nil + return entry.Value, ResultMeta{Index: entry.Index}, nil } } @@ -552,7 +565,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint) (<- // fetchDirect fetches the given request with no caching. Because this // bypasses the caching entirely, multiple matching requests will result // in multiple actual RPC calls (unlike fetch). -func (c *Cache) fetchDirect(t string, r Request) (interface{}, ResultMeta, error) { +func (c *Cache) fetchDirect(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) { // Get the type that we're fetching c.typesLock.RLock() tEntry, ok := c.types[t] @@ -563,7 +576,7 @@ func (c *Cache) fetchDirect(t string, r Request) (interface{}, ResultMeta, error // Fetch it with the min index specified directly by the request. result, err := tEntry.Type.Fetch(FetchOptions{ - MinIndex: r.CacheInfo().MinIndex, + MinIndex: minIndex, }, r) if err != nil { return nil, ResultMeta{}, err @@ -573,6 +586,21 @@ func (c *Cache) fetchDirect(t string, r Request) (interface{}, ResultMeta, error return result.Value, ResultMeta{}, nil } +func backOffWait(failures uint) time.Duration { + if failures > CacheRefreshBackoffMin { + shift := failures - CacheRefreshBackoffMin + waitTime := CacheRefreshMaxWait + if shift < 31 { + waitTime = (1 << shift) * time.Second + } + if waitTime > CacheRefreshMaxWait { + waitTime = CacheRefreshMaxWait + } + return waitTime + } + return 0 +} + // refresh triggers a fetch for a specific Request according to the // registration options. func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key string, r Request) { @@ -586,17 +614,8 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin } // If we're over the attempt minimum, start an exponential backoff. - if attempt > CacheRefreshBackoffMin { - shift := attempt - CacheRefreshBackoffMin - waitTime := CacheRefreshMaxWait - if shift < 31 { - waitTime = (1 << shift) * time.Second - } - if waitTime > CacheRefreshMaxWait { - waitTime = CacheRefreshMaxWait - } - - time.Sleep(waitTime) + if wait := backOffWait(attempt); wait > 0 { + time.Sleep(wait) } // If we have a timer, wait for it diff --git a/agent/cache/testing.go b/agent/cache/testing.go index 8b3bb8df58..10acc14dfd 100644 --- a/agent/cache/testing.go +++ b/agent/cache/testing.go @@ -6,6 +6,7 @@ import ( "github.com/mitchellh/go-testing-interface" "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" ) // TestCache returns a Cache instance configuring for testing. @@ -50,6 +51,43 @@ func TestCacheGetChResult(t testing.T, ch <-chan interface{}, expected interface } } +// TestCacheNotifyChResult tests that the expected updated was delivered on a +// Notify() chan within a reasonable period of time (it expects it to be +// "immediate" but waits some milliseconds). Expected may be given multiple +// times and if so these are all waited for and asserted to match but IN ANY +// ORDER to ensure we aren't timing dependent. +func TestCacheNotifyChResult(t testing.T, ch <-chan UpdateEvent, expected ...UpdateEvent) { + t.Helper() + + expectLen := len(expected) + if expectLen < 1 { + panic("asserting nothing") + } + + got := make([]UpdateEvent, 0, expectLen) + timeoutCh := time.After(50 * time.Millisecond) + +OUT: + for { + select { + case result := <-ch: + // Ignore age as it's non-deterministic + result.Meta.Age = 0 + got = append(got, result) + if len(got) == expectLen { + break OUT + } + + case <-timeoutCh: + t.Fatalf("got %d results on chan in 50ms, want %d", len(got), expectLen) + } + } + + // We already asserted len since you can only get here if we appended enough. + // Just check all the results we got are in the expected slice + require.ElementsMatch(t, expected, got) +} + // TestRequest returns a Request that returns the given cache key and index. // The Reset method can be called to reset it for custom usage. func TestRequest(t testing.T, info RequestInfo) *MockRequest { diff --git a/agent/cache/watch.go b/agent/cache/watch.go new file mode 100644 index 0000000000..f23d5a69ae --- /dev/null +++ b/agent/cache/watch.go @@ -0,0 +1,122 @@ +package cache + +import ( + "context" + "fmt" + "time" +) + +// UpdateEvent is a struct summarising an update to a cache entry +type UpdateEvent struct { + // CorrelationID is used by the Notify API to allow correlation of updates + // with specific requests. We could return the full request object and + // cachetype for consumers to match against the calls they made but in + // practice it's cleaner for them to choose the minimal necessary unique + // identifier given the set of things they are watching. They might even + // choose to assign random IDs for example. + CorrelationID string + Result interface{} + Meta ResultMeta + Err error +} + +// Notify registers a desire to be updated about changes to a cache result. +// +// It is a helper that abstracts code from perfroming their own "blocking" query +// logic against a cache key to watch for changes and to maintain the key in +// cache actively. It will continue to perform blocking Get requests until the +// context is canceled. +// +// The passed context must be cancelled or timeout in order to free resources +// and stop maintaining the value in cache. Typically request-scoped resources +// do this but if a long-lived context like context.Background is used, then the +// caller must arrange for it to be cancelled when the watch is no longer +// needed. +// +// The passed chan may be buffered or unbuffered, if the caller doesn't consume +// fast enough it will block the notification loop. When the chan is later +// drained, watching resumes correctly. If the pause is longer than the +// cachetype's TTL, the result might be removed from the local cache. Even in +// this case though when the chan is drained again, the new Get will re-fetch +// the entry from servers and resume notification behaviour transparently. +// +// The chan is passed in to allow multiple cached results to be watched by a +// single consumer without juggling extra goroutines per watch. The +// correlationID is opaque and will be returned in all UpdateEvents generated by +// result of watching the specified request so the caller can set this to any +// value that allows them to dissambiguate between events in the returned chan +// when sharing a chan between multiple cache entries. If the chan is closed, +// the notify loop will terminate. +func (c *Cache) Notify(ctx context.Context, t string, r Request, + correlationID string, ch chan<- UpdateEvent) error { + + // Get the type that we're fetching + c.typesLock.RLock() + tEntry, ok := c.types[t] + c.typesLock.RUnlock() + if !ok { + return fmt.Errorf("unknown type in cache: %s", t) + } + if !tEntry.Type.SupportsBlocking() { + return fmt.Errorf("watch requires the type to support blocking") + } + + // Always start at 0 index to deliver the inital (possibly currently cached + // value). + index := uint64(0) + + go func() { + var failures uint + + for { + // Check context hasn't been cancelled + if ctx.Err() != nil { + return + } + + // Blocking request + res, meta, err := c.getWithIndex(t, r, index) + + // Check context hasn't been cancelled + if ctx.Err() != nil { + return + } + + // Check the index of the value returned in the cache entry to be sure it + // changed + if index < meta.Index { + u := UpdateEvent{correlationID, res, meta, err} + select { + case ch <- u: + case <-ctx.Done(): + return + } + + // Update index for next request + index = meta.Index + } + + // Handle errors with backoff. Badly behaved blocking calls that returned + // a zero index are considered as failures since we need to not get stuck + // in a busy loop. + if err == nil && meta.Index > 0 { + failures = 0 + } else { + failures++ + } + if wait := backOffWait(failures); wait > 0 { + select { + case <-time.After(wait): + case <-ctx.Done(): + return + } + } + // Sanity check we always request blocking on second pass + if index < 1 { + index = 1 + } + } + }() + + return nil +} diff --git a/agent/cache/watch_test.go b/agent/cache/watch_test.go new file mode 100644 index 0000000000..7bd6de9435 --- /dev/null +++ b/agent/cache/watch_test.go @@ -0,0 +1,208 @@ +package cache + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +// Test that a type registered with a periodic refresh can be watched. +func TestCacheNotify(t *testing.T) { + t.Parallel() + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, &RegisterOptions{ + Refresh: false, + }) + + // Setup triggers to control when "updates" should be delivered + trigger := make([]chan time.Time, 4) + for i := range trigger { + trigger[i] = make(chan time.Time) + } + + // Configure the type + typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once().Run(func(args mock.Arguments) { + // Assert the right request type - all real Fetch implementations do this so + // it keeps us honest that Watch doesn't require type mangling which will + // break in real life (hint: it did on the first attempt) + _, ok := args.Get(1).(*MockRequest) + require.True(t, ok) + }) + typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[0]) + typ.Static(FetchResult{Value: 12, Index: 5}, nil).Once().WaitUntil(trigger[1]) + typ.Static(FetchResult{Value: 42, Index: 7}, nil).Once().WaitUntil(trigger[2]) + // It's timing dependent whether the blocking loop manages to make another + // call before we cancel so don't require it. We need to have a higher index + // here because if the index is the same then the cache Get will not return + // until the full 10 min timeout expires. This causes the last fetch to return + // after cancellation as if it had timed out. + typ.Static(FetchResult{Value: 42, Index: 8}, nil).WaitUntil(trigger[3]) + + require := require.New(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan UpdateEvent) + + err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test", ch) + require.NoError(err) + + // Should receive the first result pretty soon + TestCacheNotifyChResult(t, ch, UpdateEvent{ + CorrelationID: "test", + Result: 1, + Meta: ResultMeta{Hit: false, Index: 4}, + Err: nil, + }) + + // There should be no more updates delivered yet + require.Len(ch, 0) + + // Trigger blocking query to return a "change" + close(trigger[0]) + + // Should receive the next result pretty soon + TestCacheNotifyChResult(t, ch, UpdateEvent{ + CorrelationID: "test", + Result: 12, + // Note these are never cache "hits" because blocking will wait until there + // is a new value at which point it's not considered a hit. + Meta: ResultMeta{Hit: false, Index: 5}, + Err: nil, + }) + + // Registere a second observer using same chan and request. Note that this is + // testing a few things implicitly: + // - that multiple watchers on the same cache entity are de-duped in their + // requests to the "backend" + // - that multiple watchers can distinguish their results using correlationID + err = c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test2", ch) + require.NoError(err) + + // Should get test2 notify immediately, and it should be a cache hit + TestCacheNotifyChResult(t, ch, UpdateEvent{ + CorrelationID: "test2", + Result: 12, + Meta: ResultMeta{Hit: true, Index: 5}, + Err: nil, + }) + + // We could wait for a full timeout but we can't directly observe it so + // simulate the behaviour by triggering a response with the same value and + // index as the last one. + close(trigger[1]) + + // We should NOT be notified about that. Note this is timing dependent but + // it's only a sanity check, if we somehow _do_ get the change delivered later + // than 10ms the next value assertion will fail anyway. + time.Sleep(10 * time.Millisecond) + require.Len(ch, 0) + + // Trigger final update + close(trigger[2]) + + TestCacheNotifyChResult(t, ch, UpdateEvent{ + CorrelationID: "test", + Result: 42, + Meta: ResultMeta{Hit: false, Index: 7}, + Err: nil, + }, UpdateEvent{ + CorrelationID: "test2", + Result: 42, + Meta: ResultMeta{Hit: false, Index: 7}, + Err: nil, + }) + + // Sanity check closing chan before context is cancelled doesn't panic + //close(ch) + + // Close context + cancel() + + // It's likely but not certain that at least one of the watchers was blocked + // on the next cache Get so trigger that to timeout so we can observe the + // watch goroutines being cleaned up. This is necessary since currently we + // have no way to interrupt a blocking query. In practice it's fine to know + // that after 10 mins max the blocking query will return and the resources + // will be cleaned. + close(trigger[3]) + + // I want to test that cancelling the context cleans up goroutines (which it + // does from manual verification with debugger etc). I had a check based on a + // similar approach to https://golang.org/src/net/http/main_test.go#L60 but it + // was just too flaky because it relies on the timing of the error backoff + // timer goroutines and similar so I've given up for now as I have more + // important things to get working. +} + +// Test that a refresh performs a backoff. +func TestCacheWatch_ErrorBackoff(t *testing.T) { + t.Parallel() + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, &RegisterOptions{ + Refresh: false, + }) + + // Configure the type + var retries uint32 + fetchErr := fmt.Errorf("test fetch error") + typ.Static(FetchResult{Value: 1, Index: 4}, nil).Once() + typ.Static(FetchResult{Value: nil, Index: 5}, fetchErr).Run(func(args mock.Arguments) { + atomic.AddUint32(&retries, 1) + }) + + require := require.New(t) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := make(chan UpdateEvent) + + err := c.Notify(ctx, "t", TestRequest(t, RequestInfo{Key: "hello"}), "test", ch) + require.NoError(err) + + // Should receive the first result pretty soon + TestCacheNotifyChResult(t, ch, UpdateEvent{ + CorrelationID: "test", + Result: 1, + Meta: ResultMeta{Hit: false, Index: 4}, + Err: nil, + }) + + numErrors := 0 + // Loop for a little while and count how many errors we see reported. If this + // was running as fast as it could go we'd expect this to be huge. We have to + // be a little careful here because the watch chan ch doesn't have a large + // buffer so we could be artificially slowing down the loop without the + // backoff actualy taking affect. We can validate that by ensuring this test + // fails without the backoff code reliably. + timeoutC := time.After(500 * time.Millisecond) +OUT: + for { + select { + case <-timeoutC: + break OUT + case u := <-ch: + numErrors++ + require.Error(u.Err) + } + } + // Must be fewer than 10 failures in that time + require.True(numErrors < 10, fmt.Sprintf("numErrors: %d", numErrors)) + + // Check the number of RPCs as a sanity check too + actual := atomic.LoadUint32(&retries) + require.True(actual < 10, fmt.Sprintf("actual: %d", actual)) +}