From c9a87be6ee7c8236f5995660c64e3a1fd35a9203 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Fri, 3 Apr 2020 16:01:56 -0400 Subject: [PATCH] agent/cache: move typeEntry lookup to the edge This change moves all the typeEntry lookups to the first step in the exported methods, and makes unexporter internals accept the typeEntry struct. This change is primarily intended to make it easier to extract the container of caches from the Cache type. It may incidentally reduce locking in fetch, but that was not a goal. --- agent/cache/cache.go | 77 ++++++++++++++++---------------------------- agent/cache/watch.go | 12 +++---- 2 files changed, 33 insertions(+), 56 deletions(-) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 7892272451..1bf14b341e 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -83,6 +83,8 @@ type Cache struct { // typeEntry is a single type that is registered with a Cache. type typeEntry struct { + // Name that was used to register the Type + Name string Type Type Opts *RegisterOptions } @@ -193,7 +195,7 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) { c.typesLock.Lock() defer c.typesLock.Unlock() - c.types[n] = typeEntry{Type: typ, Opts: opts} + c.types[n] = typeEntry{Name: n, Type: typ, Opts: opts} } // Get loads the data for the given type and request. If data satisfying the @@ -211,7 +213,15 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) { // error is returned on timeout. This matches the behavior of Consul blocking // queries. func (c *Cache) Get(t string, r Request) (interface{}, ResultMeta, error) { - return c.getWithIndex(t, r, r.CacheInfo().MinIndex) + c.typesLock.RLock() + tEntry, ok := c.types[t] + c.typesLock.RUnlock() + if !ok { + // Shouldn't happen given that we successfully fetched this at least + // once. But be robust against panics. + return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t) + } + return c.getWithIndex(tEntry, r, r.CacheInfo().MinIndex) } // getEntryLocked retrieves a cache entry and checks if it is ready to be @@ -258,18 +268,17 @@ func (c *Cache) getEntryLocked(tEntry typeEntry, key string, maxAge time.Duratio // getWithIndex implements the main Get functionality but allows internal // callers (Watch) to manipulate the blocking index separately from the actual // request object. -func (c *Cache) getWithIndex(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) { +func (c *Cache) getWithIndex(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) { info := r.CacheInfo() if info.Key == "" { metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1) // If no key is specified, then we do not cache this request. // Pass directly through to the backend. - return c.fetchDirect(t, r, minIndex) + return c.fetchDirect(tEntry, r, minIndex) } - // Get the actual key for our entry - key := c.entryKey(t, &info) + key := makeEntryKey(tEntry.Name, info.Datacenter, info.Token, info.Key) // First time through first := true @@ -278,16 +287,6 @@ func (c *Cache) getWithIndex(t string, r Request, minIndex uint64) (interface{}, var timeoutCh <-chan time.Time RETRY_GET: - // Get the type that we're fetching - c.typesLock.RLock() - tEntry, ok := c.types[t] - c.typesLock.RUnlock() - if !ok { - // Shouldn't happen given that we successfully fetched this at least - // once. But be robust against panics. - return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t) - } - // Get the current value c.entriesLock.RLock() _, cacheHit, entry := c.getEntryLocked(tEntry, key, info.MaxAge, info.MustRevalidate && first, minIndex) @@ -296,7 +295,7 @@ RETRY_GET: if cacheHit { meta := ResultMeta{Index: entry.Index} if first { - metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1) + metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "hit"}, 1) meta.Hit = true } @@ -351,11 +350,11 @@ RETRY_GET: // We increment two different counters for cache misses depending on // whether we're missing because we didn't have the data at all, // or if we're missing because we're blocking on a set index. + missKey := "miss_block" if minIndex == 0 { - metrics.IncrCounter([]string{"consul", "cache", t, "miss_new"}, 1) - } else { - metrics.IncrCounter([]string{"consul", "cache", t, "miss_block"}, 1) + missKey = "miss_new" } + metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, missKey}, 1) } // Set our timeout channel if we must @@ -365,7 +364,7 @@ RETRY_GET: // At this point, we know we either don't have a value at all or the // value we have is too old. We need to wait for new data. - waiterCh, err := c.fetch(t, key, r, true, 0, minIndex, false, !first) + waiterCh, err := c.fetch(tEntry, key, r, true, 0, minIndex, false, !first) if err != nil { return nil, ResultMeta{Index: entry.Index}, err } @@ -384,12 +383,6 @@ RETRY_GET: } } -// entryKey returns the key for the entry in the cache. See the note -// about the entry key format in the structure docs for Cache. -func (c *Cache) entryKey(t string, r *RequestInfo) string { - return makeEntryKey(t, r.Datacenter, r.Token, r.Key) -} - func makeEntryKey(t, dc, token, key string) string { return fmt.Sprintf("%s/%s/%s/%s", t, dc, token, key) } @@ -402,15 +395,7 @@ func makeEntryKey(t, dc, token, key string) string { // If allowNew is true then the fetch should create the cache entry // if it doesn't exist. If this is false, then fetch will do nothing // if the entry doesn't exist. This latter case is to support refreshing. -func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) { - // Get the type that we're fetching - c.typesLock.RLock() - tEntry, ok := c.types[t] - c.typesLock.RUnlock() - if !ok { - return nil, fmt.Errorf("unknown type in cache: %s", t) - } - +func (c *Cache) fetch(tEntry typeEntry, key string, r Request, allowNew bool, attempt uint, minIndex uint64, ignoreExisting bool, ignoreRevalidation bool) (<-chan struct{}, error) { info := r.CacheInfo() // We acquire a write lock because we may have to set Fetching to true. @@ -543,7 +528,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min // Error handling if err == nil { metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1) - metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1) + metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "fetch_success"}, 1) if result.Index > 0 { // Reset the attempts counter so we don't have any backoff @@ -572,7 +557,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min } } else { metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1) - metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1) + metrics.IncrCounter([]string{"consul", "cache", tEntry.Name, "fetch_error"}, 1) // Increment attempt counter attempt++ @@ -613,7 +598,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min // If refresh is enabled, run the refresh in due time. The refresh // below might block, but saves us from spawning another goroutine. if tEntry.Opts.Refresh { - c.refresh(tEntry.Opts, attempt, t, key, r) + c.refresh(tEntry.Opts, attempt, tEntry, key, r) } }() @@ -623,15 +608,7 @@ func (c *Cache) fetch(t, key string, r Request, allowNew bool, attempt uint, min // fetchDirect fetches the given request with no caching. Because this // bypasses the caching entirely, multiple matching requests will result // in multiple actual RPC calls (unlike fetch). -func (c *Cache) fetchDirect(t string, r Request, minIndex uint64) (interface{}, ResultMeta, error) { - // Get the type that we're fetching - c.typesLock.RLock() - tEntry, ok := c.types[t] - c.typesLock.RUnlock() - if !ok { - return nil, ResultMeta{}, fmt.Errorf("unknown type in cache: %s", t) - } - +func (c *Cache) fetchDirect(tEntry typeEntry, r Request, minIndex uint64) (interface{}, ResultMeta, error) { // Fetch it with the min index specified directly by the request. result, err := tEntry.Type.Fetch(FetchOptions{ MinIndex: minIndex, @@ -661,7 +638,7 @@ func backOffWait(failures uint) time.Duration { // refresh triggers a fetch for a specific Request according to the // registration options. -func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key string, r Request) { +func (c *Cache) refresh(opts *RegisterOptions, attempt uint, tEntry typeEntry, key string, r Request) { // Sanity-check, we should not schedule anything that has refresh disabled if !opts.Refresh { return @@ -684,7 +661,7 @@ func (c *Cache) refresh(opts *RegisterOptions, attempt uint, t string, key strin // Trigger. The "allowNew" field is false because in the time we were // waiting to refresh we may have expired and got evicted. If that // happened, we don't want to create a new entry. - c.fetch(t, key, r, false, attempt, 0, true, true) + c.fetch(tEntry, key, r, false, attempt, 0, true, true) } // runExpiryLoop is a blocking function that watches the expiration diff --git a/agent/cache/watch.go b/agent/cache/watch.go index fca176fe08..af3b097c5b 100644 --- a/agent/cache/watch.go +++ b/agent/cache/watch.go @@ -61,19 +61,19 @@ func (c *Cache) Notify(ctx context.Context, t string, r Request, return fmt.Errorf("unknown type in cache: %s", t) } if tEntry.Type.SupportsBlocking() { - go c.notifyBlockingQuery(ctx, t, r, correlationID, ch) + go c.notifyBlockingQuery(ctx, tEntry, r, correlationID, ch) } else { info := r.CacheInfo() if info.MaxAge == 0 { return fmt.Errorf("Cannot use Notify for polling cache types without specifying the MaxAge") } - go c.notifyPollingQuery(ctx, t, r, correlationID, ch, info.MaxAge) + go c.notifyPollingQuery(ctx, tEntry, r, correlationID, ch, info.MaxAge) } return nil } -func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent) { +func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Request, correlationID string, ch chan<- UpdateEvent) { // Always start at 0 index to deliver the initial (possibly currently cached // value). index := uint64(0) @@ -86,7 +86,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co } // Blocking request - res, meta, err := c.getWithIndex(t, r, index) + res, meta, err := c.getWithIndex(tEntry, r, index) // Check context hasn't been canceled if ctx.Err() != nil { @@ -132,7 +132,7 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, t string, r Request, co } } -func (c *Cache) notifyPollingQuery(ctx context.Context, t string, r Request, correlationID string, ch chan<- UpdateEvent, maxAge time.Duration) { +func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Request, correlationID string, ch chan<- UpdateEvent, maxAge time.Duration) { index := uint64(0) failures := uint(0) @@ -145,7 +145,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, t string, r Request, cor } // Make the request - res, meta, err := c.getWithIndex(t, r, index) + res, meta, err := c.getWithIndex(tEntry, r, index) // Check context hasn't been canceled if ctx.Err() != nil {