From 00e7ab3cd5cb5f4e8f2106ab70151be39aa3c6e2 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Tue, 17 Apr 2018 18:03:13 -0500 Subject: [PATCH] agent/cache: integrate go-metrics so the cache is debuggable --- agent/cache/cache.go | 44 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/agent/cache/cache.go b/agent/cache/cache.go index a1b4570b07..db9f11a0e9 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -17,6 +17,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/armon/go-metrics" ) //go:generate mockery -all -inpkg @@ -109,6 +111,9 @@ type RegisterOptions struct { } // RegisterType registers a cacheable type. +// +// This makes the type available for Get but does not automatically perform +// any prefetching. In order to populate the cache, Get must be called. func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) { c.typesLock.Lock() defer c.typesLock.Unlock() @@ -122,9 +127,18 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) { // // Multiple Get calls for the same Request (matching CacheKey value) will // block on a single network request. +// +// The timeout specified by the Request will be the timeout on the cache +// Get, and does not correspond to the timeout of any background data +// fetching. If the timeout is reached before data satisfying the minimum +// index is retrieved, the last known value (maybe nil) is returned. No +// error is returned on timeout. This matches the behavior of Consul blocking +// queries. func (c *Cache) Get(t string, r Request) (interface{}, error) { info := r.CacheInfo() if info.Key == "" { + metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1) + // If no key is specified, then we do not cache this request. // Pass directly through to the backend. return c.fetchDirect(t, r) @@ -152,6 +166,7 @@ RETRY_GET: if ok && entry.Valid { if info.MinIndex == 0 || info.MinIndex < entry.Index { if first { + metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1) atomic.AddUint64(&c.hits, 1) } @@ -162,6 +177,15 @@ RETRY_GET: if first { // Record the miss if its our first time through atomic.AddUint64(&c.misses, 1) + + // We increment two different counters for cache misses depending on + // whether we're missing because we didn't have the data at all, + // or if we're missing because we're blocking on a set index. + if info.MinIndex == 0 { + metrics.IncrCounter([]string{"consul", "cache", t, "miss_new"}, 1) + } else { + metrics.IncrCounter([]string{"consul", "cache", t, "miss_block"}, 1) + } } // No longer our first time through @@ -196,6 +220,10 @@ func (c *Cache) entryKey(r *RequestInfo) string { return fmt.Sprintf("%s/%s/%s", r.Datacenter, r.Token, r.Key) } +// fetch triggers a new background fetch for the given Request. If a +// background fetch is already running for a matching Request, the waiter +// channel for that request is returned. The effect of this is that there +// is only ever one blocking query for any matching requests. func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) { // Get the type that we're fetching c.typesLock.RLock() @@ -205,6 +233,7 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) { return nil, fmt.Errorf("unknown type in cache: %s", t) } + // We acquire a write lock because we may have to set Fetching to true. c.entriesLock.Lock() defer c.entriesLock.Unlock() entry, ok := c.entries[key] @@ -226,6 +255,7 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) { // perform multiple fetches. entry.Fetching = true c.entries[key] = entry + metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries))) // The actual Fetch must be performed in a goroutine. go func() { @@ -234,6 +264,14 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) { MinIndex: entry.Index, }, r) + if err == nil { + metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1) + metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1) + } else { + metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1) + metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1) + } + var newEntry cacheEntry if result.Value == nil { // If no value was set, then we do not change the prior entry. @@ -272,7 +310,9 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) { return entry.Waiter, nil } -// fetchDirect fetches the given request with no caching. +// fetchDirect fetches the given request with no caching. Because this +// bypasses the caching entirely, multiple matching requests will result +// in multiple actual RPC calls (unlike fetch). func (c *Cache) fetchDirect(t string, r Request) (interface{}, error) { // Get the type that we're fetching c.typesLock.RLock() @@ -294,6 +334,8 @@ func (c *Cache) fetchDirect(t string, r Request) (interface{}, error) { return result.Value, nil } +// refresh triggers a fetch for a specific Request according to the +// registration options. func (c *Cache) refresh(opts *RegisterOptions, t string, key string, r Request) { // Sanity-check, we should not schedule anything that has refresh disabled if !opts.Refresh {