agent/cache: reduce function arguments by removing duplicates

A few of the unexported functions in agent/cache took a large number of
arguments. These arguments were effectively overrides for values that
were provided in RequestInfo.

By using a struct we can not only reduce the number of arguments, but
also simplify the logic by removing the need for overrides.
This commit is contained in:
Daniel Nephin 2020-04-13 14:49:13 -04:00
parent 2bb1efda27
commit 4ef9fc9f27
2 changed files with 60 additions and 44 deletions

84
agent/cache/cache.go vendored
View File

@ -225,7 +225,28 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
// once. But be robust against panics.
return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t)
}
return c.getWithIndex(tEntry, r, r.CacheInfo().MinIndex)
return c.getWithIndex(newGetOptions(tEntry, r))
}
// getOptions contains the arguments for a Get request. It is used in place of
// Request so that internal functions can modify Info without having to extract
// it from the Request each time.
type getOptions struct {
// Fetch is a closure over tEntry.Type.Fetch which provides the original
// Request from the caller.
Fetch func(opts FetchOptions) (FetchResult, error)
Info RequestInfo
TypeEntry typeEntry
}
func newGetOptions(tEntry typeEntry, r Request) getOptions {
return getOptions{
Fetch: func(opts FetchOptions) (FetchResult, error) {
return tEntry.Type.Fetch(opts, r)
},
Info: r.CacheInfo(),
TypeEntry: tEntry,
}
}
// getEntryLocked retrieves a cache entry and checks if it is ready to be
@ -234,9 +255,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) {
func (c *Cache) getEntryLocked(
tEntry typeEntry,
key string,
maxAge time.Duration,
revalidate bool,
minIndex uint64,
info RequestInfo,
) (entryExists bool, entryValid bool, entry cacheEntry) {
entry, ok := c.entries[key]
if !entry.Valid {
@ -245,7 +264,7 @@ func (c *Cache) getEntryLocked(
// Check index is not specified or lower than value, or the type doesn't
// support blocking.
if tEntry.Opts.SupportsBlocking && minIndex > 0 && minIndex >= entry.Index {
if tEntry.Opts.SupportsBlocking && info.MinIndex > 0 && info.MinIndex >= entry.Index {
// MinIndex was given and matches or is higher than current value so we
// ignore the cache and fallthrough to blocking on a new value below.
return true, false, entry
@ -253,13 +272,13 @@ func (c *Cache) getEntryLocked(
// Check MaxAge is not exceeded if this is not a background refreshing type
// and MaxAge was specified.
if !tEntry.Opts.Refresh && maxAge > 0 && entryExceedsMaxAge(maxAge, entry) {
if !tEntry.Opts.Refresh && info.MaxAge > 0 && entryExceedsMaxAge(info.MaxAge, entry) {
return true, false, entry
}
// Check if re-validate is requested. If so the first time round the
// loop is not a hit but subsequent ones should be treated normally.
if !tEntry.Opts.Refresh && revalidate {
if !tEntry.Opts.Refresh && info.MustRevalidate {
return true, false, entry
}
@ -273,17 +292,17 @@ func entryExceedsMaxAge(maxAge time.Duration, entry cacheEntry) bool {
// getWithIndex implements the main Get functionality but allows internal
// callers (Watch) to manipulate the blocking index separately from the actual
// request object.
func (c *Cache) getWithIndex(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
info := r.CacheInfo()
if info.Key == "" {
func (c *Cache) getWithIndex(r getOptions) (interface{}, ResultMeta, error) {
if r.Info.Key == "" {
metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1)
// If no key is specified, then we do not cache this request.
// Pass directly through to the backend.
return c.fetchDirect(tEntry, r, minIndex)
result, err := r.Fetch(FetchOptions{MinIndex: r.Info.MinIndex})
return result.Value, ResultMeta{}, err
}
key := makeEntryKey(tEntry.Name, info.Datacenter, info.Token, info.Key)
key := makeEntryKey(r.TypeEntry.Name, r.Info.Datacenter, r.Info.Token, r.Info.Key)
// First time through
first := true
@ -294,19 +313,19 @@ func (c *Cache) getWithIndex(tEntry typeEntry, r Request, minIndex uint64) (inte
RETRY_GET:
// Get the current value
c.entriesLock.RLock()
_, entryValid, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex)
_, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, r.Info)
c.entriesLock.RUnlock()
if entryValid {
meta := ResultMeta{Index: entry.Index}
if first {
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "hit"}, 1)
metrics.IncrCounter([]string{"consul", "cache", r.TypeEntry.Name, "hit"}, 1)
meta.Hit = true
}
// If refresh is enabled, calculate age based on whether the background
// routine is still connected.
if tEntry.Opts.Refresh {
if r.TypeEntry.Opts.Refresh {
meta.Age = time.Duration(0)
if !entry.RefreshLostContact.IsZero() {
meta.Age = time.Since(entry.RefreshLostContact)
@ -321,7 +340,7 @@ RETRY_GET:
// Touch the expiration and fix the heap.
c.entriesLock.Lock()
entry.Expiry.Update(tEntry.Opts.LastGetTTL)
entry.Expiry.Update(r.TypeEntry.Opts.LastGetTTL)
c.entriesExpiryHeap.Fix(entry.Expiry)
c.entriesLock.Unlock()
@ -356,20 +375,20 @@ RETRY_GET:
// whether we're missing because we didn't have the data at all,
// or if we're missing because we're blocking on a set index.
missKey := "miss_block"
if minIndex == 0 {
if r.Info.MinIndex == 0 {
missKey = "miss_new"
}
metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, missKey}, 1)
metrics.IncrCounter([]string{"consul", "cache", r.TypeEntry.Name, missKey}, 1)
}
// Set our timeout channel if we must
if info.Timeout > 0 && timeoutCh == nil {
timeoutCh = time.After(info.Timeout)
if r.Info.Timeout > 0 && timeoutCh == nil {
timeoutCh = time.After(r.Info.Timeout)
}
// At this point, we know we either don't have a value at all or the
// value we have is too old. We need to wait for new data.
waiterCh, err := c.fetch(tEntry, key, r, true, 0, minIndex, false, !first)
waiterCh, err := c.fetch(key, r, true, 0, false)
if err != nil {
return nil, ResultMeta{Index: entry.Index}, err
}
@ -380,6 +399,7 @@ RETRY_GET:
select {
case <-waiterCh:
// Our fetch returned, retry the get from the cache.
r.Info.MustRevalidate = false
goto RETRY_GET
case <-timeoutCh:
@ -400,13 +420,13 @@ func makeEntryKey(t, dc, token, key string) string {
// If allowNew is true then the fetch should create the cache entry
// if it doesn't exist. If this is false, then fetch will do nothing
// if the entry doesn't exist. This latter case is to support refreshing.
func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) {
info := r.CacheInfo()
func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ignoreExisting bool) (<-chan struct{}, error) {
info := r.Info
// We acquire a write lock because we may have to set Fetching to true.
c.entriesLock.Lock()
defer c.entriesLock.Unlock()
ok, entryValid, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && !ignoreRevalidation, minIndex)
ok, entryValid, entry := c.getEntryLocked(r.TypeEntry, key, info)
// This handles the case where a fetch succeeded after checking for its existence in
// getWithIndex. This ensures that we don't miss updates.
@ -444,6 +464,7 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
c.entries[key] = entry
metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries)))
tEntry := r.TypeEntry
// The actual Fetch must be performed in a goroutine.
go func() {
// If we have background refresh and currently are in "disconnected" state,
@ -482,7 +503,7 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
}
// Start building the new entry by blocking on the fetch.
result, err := tEntry.Type.Fetch(fOpts, r)
result, err := r.Fetch(fOpts)
if connectedTimer != nil {
connectedTimer.Stop()
}
@ -618,22 +639,15 @@ func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, at
// Trigger. The "allowNew" field is false because in the time we were
// waiting to refresh we may have expired and got evicted. If that
// happened, we don't want to create a new entry.
c.fetch(tEntry, key, r, false, attempt, 0, true, true)
r.Info.MustRevalidate = false
r.Info.MinIndex = 0
c.fetch(key, r, false, attempt, true)
}
}()
return entry.Waiter, nil
}
// fetchDirect fetches the given request with no caching. Because this
// bypasses the caching entirely, multiple matching requests will result
// in multiple actual RPC calls (unlike fetch).
func (c *Cache) fetchDirect(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) {
// Fetch it with the min index specified directly by the request.
result, err := tEntry.Type.Fetch(FetchOptions{MinIndex: minIndex}, r)
return result.Value, ResultMeta{}, err
}
func backOffWait(failures uint) time.Duration {
if failures > CacheRefreshBackoffMin {
shift := failures - CacheRefreshBackoffMin

20
agent/cache/watch.go vendored
View File

@ -65,7 +65,7 @@ func (c *Cache) Notify(
}
if tEntry.Opts.SupportsBlocking {
go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch)
go c.notifyBlockingQuery(ctx, newGetOptions(tEntry, r), correlationID, ch)
return nil
}
@ -73,11 +73,11 @@ func (c *Cache) Notify(
if info.MaxAge == 0 {
return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge")
}
go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge)
go c.notifyPollingQuery(ctx, newGetOptions(tEntry, r), correlationID, ch)
return nil
}
func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Request, correlationID string, ch chan<- UpdateEvent) {
func (c *Cache) notifyBlockingQuery(ctx context.Context, r getOptions, correlationID string, ch chan<- UpdateEvent) {
// Always start at 0 index to deliver the initial (possibly currently cached
// value).
index := uint64(0)
@ -90,7 +90,8 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Req
}
// Blocking request
res, meta, err := c.getWithIndex(tEntry, r, index)
r.Info.MinIndex = index
res, meta, err := c.getWithIndex(r)
// Check context hasn't been canceled
if ctx.Err() != nil {
@ -136,7 +137,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Req
}
}
func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Request, correlationID string, ch chan<- UpdateEvent, maxAge time.Duration) {
func (c *Cache) notifyPollingQuery(ctx context.Context, r getOptions, correlationID string, ch chan<- UpdateEvent) {
index := uint64(0)
failures := uint(0)
@ -149,7 +150,8 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
}
// Make the request
res, meta, err := c.getWithIndex(tEntry, r, index)
r.Info.MinIndex = index
res, meta, err := c.getWithIndex(r)
// Check context hasn't been canceled
if ctx.Err() != nil {
@ -204,8 +206,8 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
// Calculate when the cached data's Age will get too stale and
// need to be re-queried. When the data's Age already exceeds the
// maxAge the pollWait value is left at 0 to immediately re-poll
if meta.Age <= maxAge {
wait = maxAge - meta.Age
if meta.Age <= r.Info.MaxAge {
wait = r.Info.MaxAge - meta.Age
}
// Add a small amount of random jitter to the polling time. One
@ -217,7 +219,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
// and then immediately have to re-fetch again. That wouldn't
// be terrible but it would expend a bunch more cpu cycles when
// we can definitely avoid it.
wait += lib.RandomStagger(maxAge / 16)
wait += lib.RandomStagger(r.Info.MaxAge / 16)
}
select {