agent/cache: integrate go-metrics so the cache is debuggable

This commit is contained in:
Mitchell Hashimoto 2018-04-17 18:03:13 -05:00
parent 9f3dbf7b2a
commit 00e7ab3cd5
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
1 changed files with 43 additions and 1 deletions

44
agent/cache/cache.go vendored
View File

@ -17,6 +17,8 @@ import (
"sync"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
)
//go:generate mockery -all -inpkg
@ -109,6 +111,9 @@ type RegisterOptions struct {
}
// RegisterType registers a cacheable type.
//
// This makes the type available for Get but does not automatically perform
// any prefetching. In order to populate the cache, Get must be called.
func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
c.typesLock.Lock()
defer c.typesLock.Unlock()
@ -122,9 +127,18 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
//
// Multiple Get calls for the same Request (matching CacheKey value) will
// block on a single network request.
//
// The timeout specified by the Request will be the timeout on the cache
// Get, and does not correspond to the timeout of any background data
// fetching. If the timeout is reached before data satisfying the minimum
// index is retrieved, the last known value (maybe nil) is returned. No
// error is returned on timeout. This matches the behavior of Consul blocking
// queries.
func (c *Cache) Get(t string, r Request) (interface{}, error) {
info := r.CacheInfo()
if info.Key == "" {
metrics.IncrCounter([]string{"consul", "cache", "bypass"}, 1)
// If no key is specified, then we do not cache this request.
// Pass directly through to the backend.
return c.fetchDirect(t, r)
@ -152,6 +166,7 @@ RETRY_GET:
if ok && entry.Valid {
if info.MinIndex == 0 || info.MinIndex < entry.Index {
if first {
metrics.IncrCounter([]string{"consul", "cache", t, "hit"}, 1)
atomic.AddUint64(&c.hits, 1)
}
@ -162,6 +177,15 @@ RETRY_GET:
if first {
// Record the miss if its our first time through
atomic.AddUint64(&c.misses, 1)
// We increment two different counters for cache misses depending on
// whether we're missing because we didn't have the data at all,
// or if we're missing because we're blocking on a set index.
if info.MinIndex == 0 {
metrics.IncrCounter([]string{"consul", "cache", t, "miss_new"}, 1)
} else {
metrics.IncrCounter([]string{"consul", "cache", t, "miss_block"}, 1)
}
}
// No longer our first time through
@ -196,6 +220,10 @@ func (c *Cache) entryKey(r *RequestInfo) string {
return fmt.Sprintf("%s/%s/%s", r.Datacenter, r.Token, r.Key)
}
// fetch triggers a new background fetch for the given Request. If a
// background fetch is already running for a matching Request, the waiter
// channel for that request is returned. The effect of this is that there
// is only ever one blocking query for any matching requests.
func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) {
// Get the type that we're fetching
c.typesLock.RLock()
@ -205,6 +233,7 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) {
return nil, fmt.Errorf("unknown type in cache: %s", t)
}
// We acquire a write lock because we may have to set Fetching to true.
c.entriesLock.Lock()
defer c.entriesLock.Unlock()
entry, ok := c.entries[key]
@ -226,6 +255,7 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) {
// perform multiple fetches.
entry.Fetching = true
c.entries[key] = entry
metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries)))
// The actual Fetch must be performed in a goroutine.
go func() {
@ -234,6 +264,14 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) {
MinIndex: entry.Index,
}, r)
if err == nil {
metrics.IncrCounter([]string{"consul", "cache", "fetch_success"}, 1)
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_success"}, 1)
} else {
metrics.IncrCounter([]string{"consul", "cache", "fetch_error"}, 1)
metrics.IncrCounter([]string{"consul", "cache", t, "fetch_error"}, 1)
}
var newEntry cacheEntry
if result.Value == nil {
// If no value was set, then we do not change the prior entry.
@ -272,7 +310,9 @@ func (c *Cache) fetch(t, key string, r Request) (<-chan struct{}, error) {
return entry.Waiter, nil
}
// fetchDirect fetches the given request with no caching.
// fetchDirect fetches the given request with no caching. Because this
// bypasses the caching entirely, multiple matching requests will result
// in multiple actual RPC calls (unlike fetch).
func (c *Cache) fetchDirect(t string, r Request) (interface{}, error) {
// Get the type that we're fetching
c.typesLock.RLock()
@ -294,6 +334,8 @@ func (c *Cache) fetchDirect(t string, r Request) (interface{}, error) {
return result.Value, nil
}
// refresh triggers a fetch for a specific Request according to the
// registration options.
func (c *Cache) refresh(opts *RegisterOptions, t string, key string, r Request) {
// Sanity-check, we should not schedule anything that has refresh disabled
if !opts.Refresh {