Merge pull request #7585 from hashicorp/dnephin/agent-cache

agent/cache: Small changes to hopefully improve readability
This commit is contained in:
Daniel Nephin 2020-04-13 15:47:52 -04:00 committed by GitHub
commit 74828a15a2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 88 additions and 125 deletions

View File

@ -1715,15 +1715,6 @@ func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
return a.delegate.RPC(method, args, reply)
}
// SnapshotRPC performs the requested snapshot RPC against the Consul server in
// a streaming manner. The contents of in will be read and passed along as the
// payload, and the response message will determine the error status, and any
// return payload will be written to out.
func (a *Agent) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer,
replyFn structs.SnapshotReplyFn) error {
return a.delegate.SnapshotRPC(args, in, out, replyFn)
}
// Leave is used to prepare the agent for a graceful shutdown
func (a *Agent) Leave() error {
return a.delegate.Leave()

134
agent/cache/cache.go vendored
View File

@ -227,42 +227,43 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
// getEntryLocked retrieves a cache entry and checks if it is ready to be
// returned given the other parameters. It reads from entries and the caller
// has to issue a read lock if necessary.
func (c *Cache) getEntryLocked(tEntry typeEntry, key string, maxAge time.Duration, revalidate bool, minIndex uint64) (bool, bool, cacheEntry) {
func (c *Cache) getEntryLocked(
tEntry typeEntry,
key string,
maxAge time.Duration,
revalidate bool,
minIndex uint64,
) (entryExists bool, entryValid bool, entry cacheEntry) {
entry, ok := c.entries[key]
cacheHit := false
if !ok {
return ok, cacheHit, entry
if !entry.Valid {
return ok, false, entry
}
// Check if we have a hit
cacheHit = ok && entry.Valid
supportsBlocking := tEntry.Type.SupportsBlocking()
// Check index is not specified or lower than value, or the type doesn't
// support blocking.
if cacheHit && supportsBlocking &&
minIndex > 0 && minIndex >= entry.Index {
if tEntry.Type.SupportsBlocking() && minIndex > 0 && minIndex >= entry.Index {
// MinIndex was given and matches or is higher than current value so we
// ignore the cache and fallthrough to blocking on a new value below.
cacheHit = false
return true, false, entry
}
// Check MaxAge is not exceeded if this is not a background refreshing type
// and MaxAge was specified.
if cacheHit && !tEntry.Opts.Refresh && maxAge > 0 &&
!entry.FetchedAt.IsZero() && maxAge < time.Since(entry.FetchedAt) {
cacheHit = false
if !tEntry.Opts.Refresh && maxAge > 0 && entryExceedsMaxAge(maxAge, entry) {
return true, false, entry
}
// Check if we are requested to revalidate. If so the first time round the
// Check if re-validate is requested. If so the first time round the
// loop is not a hit but subsequent ones should be treated normally.
if cacheHit && !tEntry.Opts.Refresh && revalidate {
cacheHit = false
if !tEntry.Opts.Refresh && revalidate {
return true, false, entry
}
return ok, cacheHit, entry
return true, true, entry
}
func entryExceedsMaxAge(maxAge time.Duration, entry cacheEntry) bool {
return !entry.FetchedAt.IsZero() && maxAge < time.Since(entry.FetchedAt)
}
// getWithIndex implements the main Get functionality but allows internal
@ -289,10 +290,10 @@ func (c *Cache) getWithIndex(tEntry typeEntry, r Request, minIndex uint64) (inte
RETRY_GET:
// Get the current value
c.entriesLock.RLock()
_, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex)
_, entryValid, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex)
c.entriesLock.RUnlock()
if cacheHit {
if entryValid {
meta := ResultMeta{Index: entry.Index}
if first {
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "hit"}, 1)
@ -316,7 +317,7 @@ RETRY_GET:
// Touch the expiration and fix the heap.
c.entriesLock.Lock()
entry.Expiry.Reset()
entry.Expiry.Update(tEntry.Opts.LastGetTTL)
c.entriesExpiryHeap.Fix(entry.Expiry)
c.entriesLock.Unlock()
@ -401,11 +402,11 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
// We acquire a write lock because we may have to set Fetching to true.
c.entriesLock.Lock()
defer c.entriesLock.Unlock()
ok, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex)
ok, entryValid, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex)
// This handles the case where a fetch succeeded after checking for its existence in
// getWithIndex. This ensures that we don't miss updates.
if ok && cacheHit && !ignoreExisting {
if ok && entryValid && !ignoreExisting {
ch := make(chan struct{})
close(ch)
return ch, nil
@ -581,11 +582,8 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
// initial expiry information and insert. If we're already in
// the heap we do nothing since we're reusing the same entry.
if newEntry.Expiry == nil || newEntry.Expiry.HeapIndex == -1 {
newEntry.Expiry = &cacheEntryExpiry{
Key: key,
TTL: tEntry.Opts.LastGetTTL,
}
newEntry.Expiry.Reset()
newEntry.Expiry = &cacheEntryExpiry{Key: key}
newEntry.Expiry.Update(tEntry.Opts.LastGetTTL)
heap.Push(c.entriesExpiryHeap, newEntry.Expiry)
}
@ -598,7 +596,25 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
// If refresh is enabled, run the refresh in due time. The refresh
// below might block, but saves us from spawning another goroutine.
if tEntry.Opts.Refresh {
c.refresh(tEntry.Opts, attempt, tEntry, key, r)
// Check if cache was stopped
if atomic.LoadUint32(&c.stopped) == 1 {
return
}
// If we're over the attempt minimum, start an exponential backoff.
if wait := backOffWait(attempt); wait > 0 {
time.Sleep(wait)
}
// If we have a timer, wait for it
if tEntry.Opts.RefreshTimer > 0 {
time.Sleep(tEntry.Opts.RefreshTimer)
}
// Trigger. The "allowNew" field is false because in the time we were
// waiting to refresh we may have expired and got evicted. If that
// happened, we don't want to create a new entry.
c.fetch(tEntry, key, r, false, attempt, 0, true, true)
}
}()
@ -610,15 +626,8 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
// in multiple actual RPC calls (unlike fetch).
func (c *Cache) fetchDirect(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
// Fetch it with the min index specified directly by the request.
result, err := tEntry.Type.Fetch(FetchOptions{
MinIndex: minIndex,
}, r)
if err != nil {
return nil, ResultMeta{}, err
}
// Return the result and ignore the rest
return result.Value, ResultMeta{}, nil
result, err := tEntry.Type.Fetch(FetchOptions{MinIndex: minIndex}, r)
return result.Value, ResultMeta{}, err
}
func backOffWait(failures uint) time.Duration {
@ -636,34 +645,6 @@ func backOffWait(failures uint) time.Duration {
return 0
}
// refresh triggers a fetch for a specific Request according to the
// registration options.
func (c *Cache) refresh(opts *RegisterOptions, attempt uint, tEntry typeEntry, key string, r Request) {
// Sanity-check, we should not schedule anything that has refresh disabled
if !opts.Refresh {
return
}
// Check if cache was stopped
if atomic.LoadUint32(&c.stopped) == 1 {
return
}
// If we're over the attempt minimum, start an exponential backoff.
if wait := backOffWait(attempt); wait > 0 {
time.Sleep(wait)
}
// If we have a timer, wait for it
if opts.RefreshTimer > 0 {
time.Sleep(opts.RefreshTimer)
}
// Trigger. The "allowNew" field is false because in the time we were
// waiting to refresh we may have expired and got evicted. If that
// happened, we don't want to create a new entry.
c.fetch(tEntry, key, r, false, attempt, 0, true, true)
}
// runExpiryLoop is a blocking function that watches the expiration
// heap and invalidates entries that have expired.
func (c *Cache) runExpiryLoop() {
@ -733,18 +714,15 @@ func (c *Cache) Close() error {
// AutoEncrypt.TLS is turned on. The cache itself cannot fetch that the first
// time because it requires a special RPCType. Subsequent runs are fine though.
func (c *Cache) Prepopulate(t string, res FetchResult, dc, token, k string) error {
// Check the type that we're prepolulating
c.typesLock.RLock()
tEntry, ok := c.types[t]
c.typesLock.RUnlock()
if !ok {
return fmt.Errorf("unknown type in cache: %s", t)
}
key := makeEntryKey(t, dc, token, k)
newEntry := cacheEntry{
Valid: true, Value: res.Value, State: res.State, Index: res.Index,
FetchedAt: time.Now(), Waiter: make(chan struct{}),
Expiry: &cacheEntryExpiry{Key: key, TTL: tEntry.Opts.LastGetTTL},
Valid: true,
Value: res.Value,
State: res.State,
Index: res.Index,
FetchedAt: time.Now(),
Waiter: make(chan struct{}),
Expiry: &cacheEntryExpiry{Key: key},
}
c.entriesLock.Lock()
c.entries[key] = newEntry

13
agent/cache/entry.go vendored
View File

@ -47,15 +47,14 @@ type cacheEntry struct {
// entry. Any modifications to this struct should be done only while
// the Cache entriesLock is held.
type cacheEntryExpiry struct {
Key string // Key in the cache map
Expires time.Time // Time when entry expires (monotonic clock)
TTL time.Duration // TTL for this entry to extend when resetting
HeapIndex int // Index in the heap
Key string // Key in the cache map
Expires time.Time // Time when entry expires (monotonic clock)
HeapIndex int // Index in the heap
}
// Reset resets the expiration to be the ttl duration from now.
func (e *cacheEntryExpiry) Reset() {
e.Expires = time.Now().Add(e.TTL)
// Update the expiry to d time from now.
func (e *cacheEntryExpiry) Update(d time.Duration) {
e.Expires = time.Now().Add(d)
}
// expiryHeap is a heap implementation that stores information about

53
agent/cache/watch.go vendored
View File

@ -50,26 +50,30 @@ type UpdateEvent struct {
// value that allows them to disambiguate between events in the returned chan
// when sharing a chan between multiple cache entries. If the chan is closed,
// the notify loop will terminate.
func (c *Cache) Notify(ctx context.Context, t string, r Request,
correlationID string, ch chan<- UpdateEvent) error {
// Get the type that we're fetching
func (c *Cache) Notify(
ctx context.Context,
t string,
r Request,
correlationID string,
ch chan<- UpdateEvent,
) error {
c.typesLock.RLock()
tEntry, ok := c.types[t]
c.typesLock.RUnlock()
if !ok {
return fmt.Errorf("unknown type in cache: %s", t)
}
if tEntry.Type.SupportsBlocking() {
go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch)
} else {
info := r.CacheInfo()
if info.MaxAge == 0 {
return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge")
}
go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge)
return nil
}
info := r.CacheInfo()
if info.MaxAge == 0 {
return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge")
}
go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge)
return nil
}
@ -107,10 +111,10 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Req
index = meta.Index
}
var wait time.Duration
// Handle errors with backoff. Badly behaved blocking calls that returned
// a zero index are considered as failures since we need to not get stuck
// in a busy loop.
wait := 0 * time.Second
if err == nil && meta.Index > 0 {
failures = 0
} else {
@ -173,6 +177,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
failures++
}
var wait time.Duration
// Determining how long to wait before the next poll is complicated.
// First off the happy path and the error path waits are handled distinctly
//
@ -194,23 +199,13 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
// as this would eliminate the single-flighting of these requests in the cache and
// the efficiencies gained by it.
if failures > 0 {
errWait := backOffWait(failures)
select {
case <-time.After(errWait):
case <-ctx.Done():
return
}
wait = backOffWait(failures)
} else {
// Default to immediately re-poll. This only will happen if the data
// we just got out of the cache is already too stale
pollWait := 0 * time.Second
// Calculate when the cached data's Age will get too stale and
// need to be re-queried. When the data's Age already exceeds the
// maxAge the pollWait value is left at 0 to immediately re-poll
if meta.Age <= maxAge {
pollWait = maxAge - meta.Age
wait = maxAge - meta.Age
}
// Add a small amount of random jitter to the polling time. One
@ -222,13 +217,13 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
// and then immediately have to re-fetch again. That wouldn't
// be terrible but it would expend a bunch more cpu cycles when
// we can definitely avoid it.
pollWait += lib.RandomStagger(maxAge / 16)
wait += lib.RandomStagger(maxAge / 16)
}
select {
case <-time.After(pollWait):
case <-ctx.Done():
return
}
select {
case <-time.After(wait):
case <-ctx.Done():
return
}
}
}

View File

@ -31,14 +31,14 @@ func (s *HTTPServer) Snapshot(resp http.ResponseWriter, req *http.Request) (inte
// Don't bother sending any request body through since it will
// be ignored.
var null bytes.Buffer
if err := s.agent.SnapshotRPC(&args, &null, resp, replyFn); err != nil {
if err := s.agent.delegate.SnapshotRPC(&args, &null, resp, replyFn); err != nil {
return nil, err
}
return nil, nil
case "PUT":
args.Op = structs.SnapshotRestore
if err := s.agent.SnapshotRPC(&args, req.Body, resp, nil); err != nil {
if err := s.agent.delegate.SnapshotRPC(&args, req.Body, resp, nil); err != nil {
return nil, err
}
return nil, nil