From 329d76fd0e144e095e340e5ff9eadc1650a8637b Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Wed, 25 Mar 2020 16:42:43 -0400 Subject: [PATCH 1/6] Remove SnapshotRPC passthrough The caller has access to the delegate, so we do not gain anything by wrapping the call in Agent. --- agent/agent.go | 9 --------- agent/snapshot_endpoint.go | 4 ++-- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/agent/agent.go b/agent/agent.go index 2567857744..581199f229 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -1715,15 +1715,6 @@ func (a *Agent) RPC(method string, args interface{}, reply interface{}) error { return a.delegate.RPC(method, args, reply) } -// SnapshotRPC performs the requested snapshot RPC against the Consul server in -// a streaming manner. The contents of in will be read and passed along as the -// payload, and the response message will determine the error status, and any -// return payload will be written to out. -func (a *Agent) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, - replyFn structs.SnapshotReplyFn) error { - return a.delegate.SnapshotRPC(args, in, out, replyFn) -} - // Leave is used to prepare the agent for a graceful shutdown func (a *Agent) Leave() error { return a.delegate.Leave() diff --git a/agent/snapshot_endpoint.go b/agent/snapshot_endpoint.go index 7c1bed366d..483eae18d7 100644 --- a/agent/snapshot_endpoint.go +++ b/agent/snapshot_endpoint.go @@ -31,14 +31,14 @@ func (s *HTTPServer) Snapshot(resp http.ResponseWriter, req *http.Request) (inte // Don't bother sending any request body through since it will // be ignored. var null bytes.Buffer - if err := s.agent.SnapshotRPC(&args, &null, resp, replyFn); err != nil { + if err := s.agent.delegate.SnapshotRPC(&args, &null, resp, replyFn); err != nil { return nil, err } return nil, nil case "PUT": args.Op = structs.SnapshotRestore - if err := s.agent.SnapshotRPC(&args, req.Body, resp, nil); err != nil { + if err := s.agent.delegate.SnapshotRPC(&args, req.Body, resp, nil); err != nil { return nil, err } return nil, nil From e9e45545dde1fd6b5337b5b38ce22e9fe2d3cc1e Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 2 Apr 2020 14:35:58 -0400 Subject: [PATCH 2/6] agent/cache: Small formatting improvements to improve readability Remove Cache.entryKey which called a single function. Format multiline struct creation one field per line. --- agent/cache/cache.go | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 1bf14b341e..71213d4016 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -610,15 +610,8 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at // in multiple actual RPC calls (unlike fetch). func (c *Cache) fetchDirect(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) { // Fetch it with the min index specified directly by the request. - result, err := tEntry.Type.Fetch(FetchOptions{ - MinIndex: minIndex, - }, r) - if err != nil { - return nil, ResultMeta{}, err - } - - // Return the result and ignore the rest - return result.Value, ResultMeta{}, nil + result, err := tEntry.Type.Fetch(FetchOptions{MinIndex: minIndex}, r) + return result.Value, ResultMeta{}, err } func backOffWait(failures uint) time.Duration { @@ -742,9 +735,13 @@ func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) erro } key := makeEntryKey(t, dc, token, k) newEntry := cacheEntry{ - Valid: true, Value: res.Value, State: res.State, Index: res.Index, - FetchedAt: time.Now(), Waiter: make(chan struct{}), - Expiry: &cacheEntryExpiry{Key: key, TTL: tEntry.Opts.LastGetTTL}, + Valid: true, + Value: res.Value, + State: res.State, + Index: res.Index, + FetchedAt: time.Now(), + Waiter: make(chan struct{}), + Expiry: &cacheEntryExpiry{Key: key, TTL: tEntry.Opts.LastGetTTL}, } c.entriesLock.Lock() c.entries[key] = newEntry From faeaed5d0c4de5cda3dc3c8efb20ef7190cf3e65 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 2 Apr 2020 16:28:47 -0400 Subject: [PATCH 3/6] agent/cache: Make the return values of getEntryLocked more obvious Use named returned so that the caller has a better idea of what these bools mean. Return early to reduce the scope, and make it more obvious what values are returned in which cases. Also reduces the number of conditional expressions in each case. --- agent/cache/cache.go | 49 ++++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 71213d4016..5162f0fa9e 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -227,42 +227,43 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) { // getEntryLocked retrieves a cache entry and checks if it is ready to be // returned given the other parameters. It reads from entries and the caller // has to issue a read lock if necessary. -func (c *Cache) getEntryLocked(tEntry typeEntry, key string, maxAge time.Duration, revalidate bool, minIndex uint64) (bool, bool, cacheEntry) { +func (c *Cache) getEntryLocked( + tEntry typeEntry, + key string, + maxAge time.Duration, + revalidate bool, + minIndex uint64, +) (entryExists bool, entryValid bool, entry cacheEntry) { entry, ok := c.entries[key] - cacheHit := false - - if !ok { - return ok, cacheHit, entry + if !entry.Valid { + return ok, false, entry } - // Check if we have a hit - cacheHit = ok && entry.Valid - - supportsBlocking := tEntry.Type.SupportsBlocking() - // Check index is not specified or lower than value, or the type doesn't // support blocking. - if cacheHit && supportsBlocking && - minIndex > 0 && minIndex >= entry.Index { + if tEntry.Type.SupportsBlocking() && minIndex > 0 && minIndex >= entry.Index { // MinIndex was given and matches or is higher than current value so we // ignore the cache and fallthrough to blocking on a new value below. - cacheHit = false + return true, false, entry } // Check MaxAge is not exceeded if this is not a background refreshing type // and MaxAge was specified. - if cacheHit && !tEntry.Opts.Refresh && maxAge > 0 && - !entry.FetchedAt.IsZero() && maxAge < time.Since(entry.FetchedAt) { - cacheHit = false + if !tEntry.Opts.Refresh && maxAge > 0 && entryExceedsMaxAge(maxAge, entry) { + return true, false, entry } - // Check if we are requested to revalidate. If so the first time round the + // Check if re-validate is requested. If so the first time round the // loop is not a hit but subsequent ones should be treated normally. - if cacheHit && !tEntry.Opts.Refresh && revalidate { - cacheHit = false + if !tEntry.Opts.Refresh && revalidate { + return true, false, entry } - return ok, cacheHit, entry + return true, true, entry +} + +func entryExceedsMaxAge(maxAge time.Duration, entry cacheEntry) bool { + return !entry.FetchedAt.IsZero() && maxAge < time.Since(entry.FetchedAt) } // getWithIndex implements the main Get functionality but allows internal @@ -289,10 +290,10 @@ func (c *Cache) getWithIndex(tEntry typeEntry, r Request, minIndex uint64) (inte RETRY_GET: // Get the current value c.entriesLock.RLock() - _, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex) + _, entryValid, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex) c.entriesLock.RUnlock() - if cacheHit { + if entryValid { meta := ResultMeta{Index: entry.Index} if first { metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "hit"}, 1) @@ -401,11 +402,11 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at // We acquire a write lock because we may have to set Fetching to true. c.entriesLock.Lock() defer c.entriesLock.Unlock() - ok, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex) + ok, entryValid, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex) // This handles the case where a fetch succeeded after checking for its existence in // getWithIndex. This ensures that we don't miss updates. - if ok && cacheHit && !ignoreExisting { + if ok && entryValid && !ignoreExisting { ch := make(chan struct{}) close(ch) return ch, nil From 66fbb13976a130c62eaf94303e4534a8ada2a892 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 2 Apr 2020 17:08:51 -0400 Subject: [PATCH 4/6] agent/cache: Inline the refresh function to make recursion more obvious fetch is already an exceptionally long function, but hiding the recrusion in a function call likely does not help. --- agent/cache/cache.go | 48 ++++++++++++++++++-------------------------- 1 file changed, 19 insertions(+), 29 deletions(-) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 5162f0fa9e..54294eb07f 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -599,7 +599,25 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at // If refresh is enabled, run the refresh in due time. The refresh // below might block, but saves us from spawning another goroutine. if tEntry.Opts.Refresh { - c.refresh(tEntry.Opts, attempt, tEntry, key, r) + // Check if cache was stopped + if atomic.LoadUint32(&c.stopped) == 1 { + return + } + + // If we're over the attempt minimum, start an exponential backoff. + if wait := backOffWait(attempt); wait > 0 { + time.Sleep(wait) + } + + // If we have a timer, wait for it + if tEntry.Opts.RefreshTimer > 0 { + time.Sleep(tEntry.Opts.RefreshTimer) + } + + // Trigger. The "allowNew" field is false because in the time we were + // waiting to refresh we may have expired and got evicted. If that + // happened, we don't want to create a new entry. + c.fetch(tEntry, key, r, false, attempt, 0, true, true) } }() @@ -630,34 +648,6 @@ func backOffWait(failures uint) time.Duration { return 0 } -// refresh triggers a fetch for a specific Request according to the -// registration options. -func (c *Cache) refresh(opts *RegisterOptions, attempt uint, tEntry typeEntry, key string, r Request) { - // Sanity-check, we should not schedule anything that has refresh disabled - if !opts.Refresh { - return - } - // Check if cache was stopped - if atomic.LoadUint32(&c.stopped) == 1 { - return - } - - // If we're over the attempt minimum, start an exponential backoff. - if wait := backOffWait(attempt); wait > 0 { - time.Sleep(wait) - } - - // If we have a timer, wait for it - if opts.RefreshTimer > 0 { - time.Sleep(opts.RefreshTimer) - } - - // Trigger. The "allowNew" field is false because in the time we were - // waiting to refresh we may have expired and got evicted. If that - // happened, we don't want to create a new entry. - c.fetch(tEntry, key, r, false, attempt, 0, true, true) -} - // runExpiryLoop is a blocking function that watches the expiration // heap and invalidates entries that have expired. func (c *Cache) runExpiryLoop() { From 7246d8b6cb64c21fd646703ef4e30a2409978da9 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 2 Apr 2020 18:05:26 -0400 Subject: [PATCH 5/6] agent/cache: Reduce differences between notify implementations These two notify functions are very similar. There appear to be just enough differences that trying to parameterize the differences may not improve things. For now, reduce some of the cosmetic differences so that the material differences are more obvious. --- agent/cache/watch.go | 53 ++++++++++++++++++++------------------------ 1 file changed, 24 insertions(+), 29 deletions(-) diff --git a/agent/cache/watch.go b/agent/cache/watch.go index af3b097c5b..ed22a70d39 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -50,26 +50,30 @@ type UpdateEvent struct { // value that allows them to disambiguate between events in the returned chan // when sharing a chan between multiple cache entries. If the chan is closed, // the notify loop will terminate. -func (c *Cache) Notify(ctx context.Context, t string, r Request, - correlationID string, ch chan<- UpdateEvent) error { - - // Get the type that we're fetching +func (c *Cache) Notify( + ctx context.Context, + t string, + r Request, + correlationID string, + ch chan<- UpdateEvent, +) error { c.typesLock.RLock() tEntry, ok := c.types[t] c.typesLock.RUnlock() if !ok { return fmt.Errorf("unknown type in cache: %s", t) } + if tEntry.Type.SupportsBlocking() { go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch) - } else { - info := r.CacheInfo() - if info.MaxAge == 0 { - return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge") - } - go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge) + return nil } + info := r.CacheInfo() + if info.MaxAge == 0 { + return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge") + } + go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge) return nil } @@ -107,10 +111,10 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Req index = meta.Index } + var wait time.Duration // Handle errors with backoff. Badly behaved blocking calls that returned // a zero index are considered as failures since we need to not get stuck // in a busy loop. - wait := 0 * time.Second if err == nil && meta.Index > 0 { failures = 0 } else { @@ -173,6 +177,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ failures++ } + var wait time.Duration // Determining how long to wait before the next poll is complicated. // First off the happy path and the error path waits are handled distinctly // @@ -194,23 +199,13 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ // as this would eliminate the single-flighting of these requests in the cache and // the efficiencies gained by it. if failures > 0 { - - errWait := backOffWait(failures) - select { - case <-time.After(errWait): - case <-ctx.Done(): - return - } + wait = backOffWait(failures) } else { - // Default to immediately re-poll. This only will happen if the data - // we just got out of the cache is already too stale - pollWait := 0 * time.Second - // Calculate when the cached data's Age will get too stale and // need to be re-queried. When the data's Age already exceeds the // maxAge the pollWait value is left at 0 to immediately re-poll if meta.Age <= maxAge { - pollWait = maxAge - meta.Age + wait = maxAge - meta.Age } // Add a small amount of random jitter to the polling time. One @@ -222,13 +217,13 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ // and then immediately have to re-fetch again. That wouldn't // be terrible but it would expend a bunch more cpu cycles when // we can definitely avoid it. - pollWait += lib.RandomStagger(maxAge / 16) + wait += lib.RandomStagger(maxAge / 16) + } - select { - case <-time.After(pollWait): - case <-ctx.Done(): - return - } + select { + case <-time.After(wait): + case <-ctx.Done(): + return } } } From 89f41bddfe320d1474dcd92ff8deeb6e7e9fe825 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 2 Apr 2020 18:42:29 -0400 Subject: [PATCH 6/6] Remove TTL from cacheEntryExpiry This should very slightly reduce the amount of memory required to store each item in the cache. It will also enable setting different TTLs based on the type of result. For example we may want to use a shorter TTL when the result indicates the resource does not exist, as storing these types of records could easily lead to a DOS caused by OOM. --- agent/cache/cache.go | 18 ++++-------------- agent/cache/entry.go | 13 ++++++------- 2 files changed, 10 insertions(+), 21 deletions(-) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 54294eb07f..66146610f2 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -317,7 +317,7 @@ RETRY_GET: // Touch the expiration and fix the heap. c.entriesLock.Lock() - entry.Expiry.Reset() + entry.Expiry.Update(tEntry.Opts.LastGetTTL) c.entriesExpiryHeap.Fix(entry.Expiry) c.entriesLock.Unlock() @@ -582,11 +582,8 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at // initial expiry information and insert. If we're already in // the heap we do nothing since we're reusing the same entry. if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 { - newEntry.Expiry = &cacheEntryExpiry{ - Key: key, - TTL: tEntry.Opts.LastGetTTL, - } - newEntry.Expiry.Reset() + newEntry.Expiry = &cacheEntryExpiry{Key: key} + newEntry.Expiry.Update(tEntry.Opts.LastGetTTL) heap.Push(c.entriesExpiryHeap, newEntry.Expiry) } @@ -717,13 +714,6 @@ func (c *Cache) Close() error { // AutoEncrypt.TLS is turned on. The cache itself cannot fetch that the first // time because it requires a special RPCType. Subsequent runs are fine though. func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) error { - // Check the type that we're prepolulating - c.typesLock.RLock() - tEntry, ok := c.types[t] - c.typesLock.RUnlock() - if !ok { - return fmt.Errorf("unknown type in cache: %s", t) - } key := makeEntryKey(t, dc, token, k) newEntry := cacheEntry{ Valid: true, @@ -732,7 +722,7 @@ func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) erro Index: res.Index, FetchedAt: time.Now(), Waiter: make(chan struct{}), - Expiry: &cacheEntryExpiry{Key: key, TTL: tEntry.Opts.LastGetTTL}, + Expiry: &cacheEntryExpiry{Key: key}, } c.entriesLock.Lock() c.entries[key] = newEntry diff --git a/agent/cache/entry.go b/agent/cache/entry.go index dfeb982e00..9d7678a056 100644 --- a/agent/cache/entry.go +++ b/agent/cache/entry.go @@ -47,15 +47,14 @@ type cacheEntry struct { // entry. Any modifications to this struct should be done only while // the Cache entriesLock is held. type cacheEntryExpiry struct { - Key string // Key in the cache map - Expires time.Time // Time when entry expires (monotonic clock) - TTL time.Duration // TTL for this entry to extend when resetting - HeapIndex int // Index in the heap + Key string // Key in the cache map + Expires time.Time // Time when entry expires (monotonic clock) + HeapIndex int // Index in the heap } -// Reset resets the expiration to be the ttl duration from now. -func (e *cacheEntryExpiry) Reset() { - e.Expires = time.Now().Add(e.TTL) +// Update the expiry to d time from now. +func (e *cacheEntryExpiry) Update(d time.Duration) { + e.Expires = time.Now().Add(d) } // expiryHeap is a heap implementation that stores information about