From 975be337a9de4e813ee05cb80567f1dea97d0574 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sun, 8 Apr 2018 14:30:14 +0100 Subject: [PATCH] agent/cache: blank cache key means to always fetch --- agent/cache/cache.go | 70 ++++++++++++++++++++-------------- agent/cache/cache_test.go | 32 ++++++++++++++++ agent/cache/rpc.go | 8 ++++ agent/cache/testing.go | 2 +- agent/cache/type.go | 27 ------------- agent/cache/type_connect_ca.go | 39 +++++++++++++++++++ 6 files changed, 122 insertions(+), 56 deletions(-) create mode 100644 agent/cache/rpc.go create mode 100644 agent/cache/type_connect_ca.go diff --git a/agent/cache/cache.go b/agent/cache/cache.go index d0172cdc0a..1c2f316dd9 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -20,29 +20,10 @@ import ( //go:generate mockery -all -inpkg -// Pre-written options for type registration. These should not be modified. -var ( - // RegisterOptsPeriodic performs a periodic refresh of data fetched - // by the registered type. - RegisterOptsPeriodic = &RegisterOptions{ - Refresh: true, - RefreshTimer: 30 * time.Second, - RefreshTimeout: 5 * time.Minute, - } -) - -// TODO: DC-aware - -// RPC is an interface that an RPC client must implement. -type RPC interface { - RPC(method string, args interface{}, reply interface{}) error -} +// TODO: DC-aware, ACL-aware // Cache is a agent-local cache of Consul data. type Cache struct { - // rpcClient is the RPC-client. - rpcClient RPC - entriesLock sync.RWMutex entries map[string]cacheEntry @@ -50,6 +31,7 @@ type Cache struct { types map[string]typeEntry } +// cacheEntry stores a single cache entry. type cacheEntry struct { // Fields pertaining to the actual value Value interface{} @@ -68,13 +50,17 @@ type typeEntry struct { Opts *RegisterOptions } +// Options are options for the Cache. +type Options struct { + // Nothing currently, reserved. +} + // New creates a new cache with the given RPC client and reasonable defaults. // Further settings can be tweaked on the returned value. -func New(rpc RPC) *Cache { +func New(*Options) *Cache { return &Cache{ - rpcClient: rpc, - entries: make(map[string]cacheEntry), - types: make(map[string]typeEntry), + entries: make(map[string]cacheEntry), + types: make(map[string]typeEntry), } } @@ -124,7 +110,11 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) { // block on a single network request. func (c *Cache) Get(t string, r Request) (interface{}, error) { key := r.CacheKey() - idx := r.CacheMinIndex() + if key == "" { + // If no key is specified, then we do not cache this request. + // Pass directly through to the backend. + return c.fetchDirect(t, r) + } RETRY_GET: // Get the current value @@ -136,8 +126,11 @@ RETRY_GET: // currently stored index then we return that right away. If the // index is zero and we have something in the cache we accept whatever // we have. - if ok && entry.Valid && (idx == 0 || idx < entry.Index) { - return entry.Value, nil + if ok && entry.Valid { + idx := r.CacheMinIndex() + if idx == 0 || idx < entry.Index { + return entry.Value, nil + } } // At this point, we know we either don't have a value at all or the @@ -192,7 +185,6 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) { // Start building the new entry by blocking on the fetch. var newEntry cacheEntry result, err := tEntry.Type.Fetch(FetchOptions{ - RPC: c.rpcClient, MinIndex: entry.Index, }, r) newEntry.Value = result.Value @@ -223,6 +215,28 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) { return entry.Waiter, nil } +// fetchDirect fetches the given request with no caching. +func (c *Cache) fetchDirect(t string, r Request) (interface{}, error) { + // Get the type that we're fetching + c.typesLock.RLock() + tEntry, ok := c.types[t] + c.typesLock.RUnlock() + if !ok { + return nil, fmt.Errorf("unknown type in cache: %s", t) + } + + // Fetch it with the min index specified directly by the request. + result, err := tEntry.Type.Fetch(FetchOptions{ + MinIndex: r.CacheMinIndex(), + }, r) + if err != nil { + return nil, err + } + + // Return the result and ignore the rest + return result.Value, nil +} + func (c *Cache) refresh(opts *RegisterOptions, t string, r Request) { // Sanity-check, we should not schedule anything that has refresh disabled if !opts.Refresh { diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index d82ded1954..69f99a6283 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -41,6 +41,38 @@ func TestCacheGet_noIndex(t *testing.T) { typ.AssertExpectations(t) } +// Test a Get with a request that returns a blank cache key. This should +// force a backend request and skip the cache entirely. +func TestCacheGet_blankCacheKey(t *testing.T) { + t.Parallel() + + require := require.New(t) + + typ := TestType(t) + defer typ.AssertExpectations(t) + c := TestCache(t) + c.RegisterType("t", typ, nil) + + // Configure the type + typ.Static(FetchResult{Value: 42}, nil).Times(2) + + // Get, should fetch + req := TestRequest(t, "", 0) + result, err := c.Get("t", req) + require.Nil(err) + require.Equal(42, result) + + // Get, should not fetch since we already have a satisfying value + result, err = c.Get("t", req) + require.Nil(err) + require.Equal(42, result) + + // Sleep a tiny bit just to let maybe some background calls happen + // then verify that we still only got the one call + time.Sleep(20 * time.Millisecond) + typ.AssertExpectations(t) +} + // Test that Get blocks on the initial value func TestCacheGet_blockingInitSameKey(t *testing.T) { t.Parallel() diff --git a/agent/cache/rpc.go b/agent/cache/rpc.go new file mode 100644 index 0000000000..98976284a0 --- /dev/null +++ b/agent/cache/rpc.go @@ -0,0 +1,8 @@ +package cache + +// RPC is an interface that an RPC client must implement. This is a helper +// interface that is implemented by the agent delegate so that Type +// implementations can request RPC access. +type RPC interface { + RPC(method string, args interface{}, reply interface{}) error +} diff --git a/agent/cache/testing.go b/agent/cache/testing.go index 7bf2bf8911..6a094c1174 100644 --- a/agent/cache/testing.go +++ b/agent/cache/testing.go @@ -11,7 +11,7 @@ import ( // TestCache returns a Cache instance configuring for testing. func TestCache(t testing.T) *Cache { // Simple but lets us do some fine-tuning later if we want to. - return New(TestRPC(t)) + return New(nil) } // TestCacheGetCh returns a channel that returns the result of the Get call. diff --git a/agent/cache/type.go b/agent/cache/type.go index fbb65761fc..6e8edeb5fa 100644 --- a/agent/cache/type.go +++ b/agent/cache/type.go @@ -20,9 +20,6 @@ type Type interface { // FetchOptions are various settable options when a Fetch is called. type FetchOptions struct { - // RPC is the RPC client to communicate to a Consul server. - RPC RPC - // MinIndex is the minimum index to be used for blocking queries. // If blocking queries aren't supported for data being returned, // this value can be ignored. @@ -42,27 +39,3 @@ type FetchResult struct { // Index is the corresponding index value for this data. Index uint64 } - -/* -type TypeCARoot struct{} - -func (c *TypeCARoot) Fetch(delegate RPC, idx uint64, req Request) (interface{}, uint64, error) { - // The request should be a DCSpecificRequest. - reqReal, ok := req.(*structs.DCSpecificRequest) - if !ok { - return nil, 0, fmt.Errorf( - "Internal cache failure: request wrong type: %T", req) - } - - // Set the minimum query index to our current index so we block - reqReal.QueryOptions.MinQueryIndex = idx - - // Fetch - var reply structs.IndexedCARoots - if err := delegate.RPC("ConnectCA.Roots", reqReal, &reply); err != nil { - return nil, 0, err - } - - return &reply, reply.QueryMeta.Index, nil -} -*/ diff --git a/agent/cache/type_connect_ca.go b/agent/cache/type_connect_ca.go new file mode 100644 index 0000000000..40bda72df2 --- /dev/null +++ b/agent/cache/type_connect_ca.go @@ -0,0 +1,39 @@ +package cache + +/* +import ( + "fmt" + + "github.com/hashicorp/consul/agent/structs" +) + +// TypeCARoot supports fetching the Connect CA roots. +type TypeCARoot struct { + RPC RPC +} + +func (c *TypeCARoot) Fetch(opts FetchOptions, req Request) (FetchResult, error) { + var result FetchResult + + // The request should be a DCSpecificRequest. + reqReal, ok := req.(*structs.DCSpecificRequest) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Set the minimum query index to our current index so we block + reqReal.QueryOptions.MinQueryIndex = opts.MinIndex + reqReal.QueryOptions.MaxQueryTime = opts.Timeout + + // Fetch + var reply structs.IndexedCARoots + if err := c.RPC.RPC("ConnectCA.Roots", reqReal, &reply); err != nil { + return result, err + } + + result.Value = &reply + result.Index = reply.QueryMeta.Index + return result, nil +} +*/