From e3c11628810615d90aba895496a6c2f545072c53 Mon Sep 17 00:00:00 2001 From: Mitchell Hashimoto Date: Sun, 8 Apr 2018 15:08:34 +0100 Subject: [PATCH] agent/cache: Reorganize some files, RequestInfo struct, prepare for partitioning --- .../connect_ca.go} | 7 ++-- .../connect_ca_test.go} | 10 +++--- agent/{cache => cache-types}/mock_RPC.go | 2 +- agent/{cache => cache-types}/rpc.go | 4 ++- agent/cache-types/testing.go | 12 +++++++ agent/cache/cache.go | 22 ++++++------- agent/cache/cache_test.go | 23 ++++++------- agent/cache/mock_Request.go | 24 +++----------- agent/cache/request.go | 32 ++++++++++++++++--- agent/cache/testing.go | 12 ++----- agent/structs/structs.go | 18 +++++++---- 11 files changed, 94 insertions(+), 72 deletions(-) rename agent/{cache/type_connect_ca.go => cache-types/connect_ca.go} (79%) rename agent/{cache/type_connect_ca_test.go => cache-types/connect_ca_test.go} (83%) rename agent/{cache => cache-types}/mock_RPC.go (96%) rename agent/{cache => cache-types}/rpc.go (83%) create mode 100644 agent/cache-types/testing.go diff --git a/agent/cache/type_connect_ca.go b/agent/cache-types/connect_ca.go similarity index 79% rename from agent/cache/type_connect_ca.go rename to agent/cache-types/connect_ca.go index 6a0a6699cb..85962b1fbb 100644 --- a/agent/cache/type_connect_ca.go +++ b/agent/cache-types/connect_ca.go @@ -1,8 +1,9 @@ -package cache +package cachetype import ( "fmt" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" ) @@ -11,8 +12,8 @@ type TypeCARoot struct { RPC RPC } -func (c *TypeCARoot) Fetch(opts FetchOptions, req Request) (FetchResult, error) { - var result FetchResult +func (c *TypeCARoot) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + var result cache.FetchResult // The request should be a DCSpecificRequest. reqReal, ok := req.(*structs.DCSpecificRequest) diff --git a/agent/cache/type_connect_ca_test.go b/agent/cache-types/connect_ca_test.go similarity index 83% rename from agent/cache/type_connect_ca_test.go rename to agent/cache-types/connect_ca_test.go index 359449d21c..faf8317bd5 100644 --- a/agent/cache/type_connect_ca_test.go +++ b/agent/cache-types/connect_ca_test.go @@ -1,9 +1,10 @@ -package cache +package cachetype import ( "testing" "time" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/structs" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -30,12 +31,12 @@ func TestTypeCARoot(t *testing.T) { }) // Fetch - result, err := typ.Fetch(FetchOptions{ + result, err := typ.Fetch(cache.FetchOptions{ MinIndex: 24, Timeout: 1 * time.Second, }, &structs.DCSpecificRequest{Datacenter: "dc1"}) require.Nil(err) - require.Equal(FetchResult{ + require.Equal(cache.FetchResult{ Value: resp, Index: 48, }, result) @@ -48,7 +49,8 @@ func TestTypeCARoot_badReqType(t *testing.T) { typ := &TypeCARoot{RPC: rpc} // Fetch - _, err := typ.Fetch(FetchOptions{}, TestRequest(t, "foo", 64)) + _, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest( + t, cache.RequestInfo{Key: "foo", MinIndex: 64})) require.NotNil(err) require.Contains(err.Error(), "wrong type") diff --git a/agent/cache/mock_RPC.go b/agent/cache-types/mock_RPC.go similarity index 96% rename from agent/cache/mock_RPC.go rename to agent/cache-types/mock_RPC.go index a1100d2a7e..6f642c66b8 100644 --- a/agent/cache/mock_RPC.go +++ b/agent/cache-types/mock_RPC.go @@ -1,5 +1,5 @@ // Code generated by mockery v1.0.0 -package cache +package cachetype import mock "github.com/stretchr/testify/mock" diff --git a/agent/cache/rpc.go b/agent/cache-types/rpc.go similarity index 83% rename from agent/cache/rpc.go rename to agent/cache-types/rpc.go index 98976284a0..0aaf040f3d 100644 --- a/agent/cache/rpc.go +++ b/agent/cache-types/rpc.go @@ -1,4 +1,6 @@ -package cache +package cachetype + +//go:generate mockery -all -inpkg // RPC is an interface that an RPC client must implement. This is a helper // interface that is implemented by the agent delegate so that Type diff --git a/agent/cache-types/testing.go b/agent/cache-types/testing.go new file mode 100644 index 0000000000..bf68ec4787 --- /dev/null +++ b/agent/cache-types/testing.go @@ -0,0 +1,12 @@ +package cachetype + +import ( + "github.com/mitchellh/go-testing-interface" +) + +// TestRPC returns a mock implementation of the RPC interface. +func TestRPC(t testing.T) *MockRPC { + // This function is relatively useless but this allows us to perhaps + // perform some initialization later. + return &MockRPC{} +} diff --git a/agent/cache/cache.go b/agent/cache/cache.go index 1c2f316dd9..04323a4c58 100644 --- a/agent/cache/cache.go +++ b/agent/cache/cache.go @@ -109,8 +109,8 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) { // Multiple Get calls for the same Request (matching CacheKey value) will // block on a single network request. func (c *Cache) Get(t string, r Request) (interface{}, error) { - key := r.CacheKey() - if key == "" { + info := r.CacheInfo() + if info.Key == "" { // If no key is specified, then we do not cache this request. // Pass directly through to the backend. return c.fetchDirect(t, r) @@ -119,7 +119,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, error) { RETRY_GET: // Get the current value c.entriesLock.RLock() - entry, ok := c.entries[key] + entry, ok := c.entries[info.Key] c.entriesLock.RUnlock() // If we have a current value and the index is greater than the @@ -127,8 +127,7 @@ RETRY_GET: // index is zero and we have something in the cache we accept whatever // we have. if ok && entry.Valid { - idx := r.CacheMinIndex() - if idx == 0 || idx < entry.Index { + if info.MinIndex == 0 || info.MinIndex < entry.Index { return entry.Value, nil } } @@ -154,13 +153,12 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) { return nil, fmt.Errorf("unknown type in cache: %s", t) } - // The cache key is used multiple times and might be dynamically - // constructed so let's just store it once here. - key := r.CacheKey() + // Grab the cache information while we're outside the lock. + info := r.CacheInfo() c.entriesLock.Lock() defer c.entriesLock.Unlock() - entry, ok := c.entries[key] + entry, ok := c.entries[info.Key] // If we already have an entry and it is actively fetching, then return // the currently active waiter. @@ -178,7 +176,7 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) { // identical calls to fetch will return the same waiter rather than // perform multiple fetches. entry.Fetching = true - c.entries[key] = entry + c.entries[info.Key] = entry // The actual Fetch must be performed in a goroutine. go func() { @@ -199,7 +197,7 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) { // Insert c.entriesLock.Lock() - c.entries[key] = newEntry + c.entries[info.Key] = newEntry c.entriesLock.Unlock() // Trigger the waiter @@ -227,7 +225,7 @@ func (c *Cache) fetchDirect(t string, r Request) (interface{}, error) { // Fetch it with the min index specified directly by the request. result, err := tEntry.Type.Fetch(FetchOptions{ - MinIndex: r.CacheMinIndex(), + MinIndex: r.CacheInfo().MinIndex, }, r) if err != nil { return nil, err diff --git a/agent/cache/cache_test.go b/agent/cache/cache_test.go index 69f99a6283..1bfed590c1 100644 --- a/agent/cache/cache_test.go +++ b/agent/cache/cache_test.go @@ -25,7 +25,7 @@ func TestCacheGet_noIndex(t *testing.T) { typ.Static(FetchResult{Value: 42}, nil).Times(1) // Get, should fetch - req := TestRequest(t, "hello", 0) + req := TestRequest(t, RequestInfo{Key: "hello"}) result, err := c.Get("t", req) require.Nil(err) require.Equal(42, result) @@ -57,7 +57,7 @@ func TestCacheGet_blankCacheKey(t *testing.T) { typ.Static(FetchResult{Value: 42}, nil).Times(2) // Get, should fetch - req := TestRequest(t, "", 0) + req := TestRequest(t, RequestInfo{Key: ""}) result, err := c.Get("t", req) require.Nil(err) require.Equal(42, result) @@ -87,8 +87,8 @@ func TestCacheGet_blockingInitSameKey(t *testing.T) { typ.Static(FetchResult{Value: 42}, nil).WaitUntil(triggerCh).Times(1) // Perform multiple gets - getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) - getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) + getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) + getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) // They should block select { @@ -131,12 +131,12 @@ func TestCacheGet_blockingInitDiffKeys(t *testing.T) { Run(func(args mock.Arguments) { keysLock.Lock() defer keysLock.Unlock() - keys = append(keys, args.Get(1).(Request).CacheKey()) + keys = append(keys, args.Get(1).(Request).CacheInfo().Key) }) // Perform multiple gets - getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) - getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, "goodbye", 0)) + getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) + getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "goodbye"})) // They should block select { @@ -176,7 +176,8 @@ func TestCacheGet_blockingIndex(t *testing.T) { typ.Static(FetchResult{Value: 42, Index: 6}, nil).WaitUntil(triggerCh) // Fetch should block - resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 5)) + resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{ + Key: "hello", MinIndex: 5})) // Should block select { @@ -217,16 +218,16 @@ func TestCacheGet_periodicRefresh(t *testing.T) { typ.Static(FetchResult{Value: 12, Index: 5}, nil).WaitUntil(triggerCh) // Fetch should block - resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) + resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) TestCacheGetChResult(t, resultCh, 1) // Fetch again almost immediately should return old result time.Sleep(5 * time.Millisecond) - resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) + resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) TestCacheGetChResult(t, resultCh, 1) // Wait for the timer time.Sleep(200 * time.Millisecond) - resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) + resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"})) TestCacheGetChResult(t, resultCh, 12) } diff --git a/agent/cache/mock_Request.go b/agent/cache/mock_Request.go index 1579121825..e3abd15159 100644 --- a/agent/cache/mock_Request.go +++ b/agent/cache/mock_Request.go @@ -8,29 +8,15 @@ type MockRequest struct { mock.Mock } -// CacheKey provides a mock function with given fields: -func (_m *MockRequest) CacheKey() string { +// CacheInfo provides a mock function with given fields: +func (_m *MockRequest) CacheInfo() RequestInfo { ret := _m.Called() - var r0 string - if rf, ok := ret.Get(0).(func() string); ok { + var r0 RequestInfo + if rf, ok := ret.Get(0).(func() RequestInfo); ok { r0 = rf() } else { - r0 = ret.Get(0).(string) - } - - return r0 -} - -// CacheMinIndex provides a mock function with given fields: -func (_m *MockRequest) CacheMinIndex() uint64 { - ret := _m.Called() - - var r0 uint64 - if rf, ok := ret.Get(0).(func() uint64); ok { - r0 = rf() - } else { - r0 = ret.Get(0).(uint64) + r0 = ret.Get(0).(RequestInfo) } return r0 diff --git a/agent/cache/request.go b/agent/cache/request.go index c75c8ad847..b4a1b75d0c 100644 --- a/agent/cache/request.go +++ b/agent/cache/request.go @@ -5,13 +5,35 @@ package cache // This interface is typically implemented by request structures in // the agent/structs package. type Request interface { - // CacheKey is a unique cache key for this request. This key should + // CacheInfo returns information used for caching this request. + CacheInfo() RequestInfo +} + +// RequestInfo represents cache information for a request. The caching +// framework uses this to control the behavior of caching and to determine +// cacheability. +type RequestInfo struct { + // Key is a unique cache key for this request. This key should // absolutely uniquely identify this request, since any conflicting // cache keys could result in invalid data being returned from the cache. - CacheKey() string + Key string - // CacheMinIndex is the minimum index being queried. This is used to + // Token is the ACL token associated with this request. + // + // Datacenter is the datacenter that the request is targeting. + // + // Both of these values are used to partition the cache. The cache framework + // today partitions data on these values to simplify behavior: by + // partitioning ACL tokens, the cache doesn't need to be smart about + // filtering results. By filtering datacenter results, the cache can + // service the multi-DC nature of Consul. This comes at the expense of + // working set size, but in general the effect is minimal. + Token string + Datacenter string + + // MinIndex is the minimum index being queried. This is used to // determine if we already have data satisfying the query or if we need - // to block until new data is available. - CacheMinIndex() uint64 + // to block until new data is available. If no index is available, the + // default value (zero) is acceptable. + MinIndex uint64 } diff --git a/agent/cache/testing.go b/agent/cache/testing.go index 6a094c1174..365dc3b4e5 100644 --- a/agent/cache/testing.go +++ b/agent/cache/testing.go @@ -50,20 +50,12 @@ func TestCacheGetChResult(t testing.T, ch <-chan interface{}, expected interface // TestRequest returns a Request that returns the given cache key and index. // The Reset method can be called to reset it for custom usage. -func TestRequest(t testing.T, key string, index uint64) *MockRequest { +func TestRequest(t testing.T, info RequestInfo) *MockRequest { req := &MockRequest{} - req.On("CacheKey").Return(key) - req.On("CacheMinIndex").Return(index) + req.On("CacheInfo").Return(info) return req } -// TestRPC returns a mock implementation of the RPC interface. -func TestRPC(t testing.T) *MockRPC { - // This function is relatively useless but this allows us to perhaps - // perform some initialization later. - return &MockRPC{} -} - // TestType returns a MockType that can be used to setup expectations // on data fetching. func TestType(t testing.T) *MockType { diff --git a/agent/structs/structs.go b/agent/structs/structs.go index d40c90baaf..19a9c73134 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/types" "github.com/hashicorp/go-msgpack/codec" @@ -278,18 +279,23 @@ func (r *DCSpecificRequest) RequestDatacenter() string { return r.Datacenter } -func (r *DCSpecificRequest) CacheKey() string { +func (r *DCSpecificRequest) CacheInfo() cache.RequestInfo { + info := cache.RequestInfo{ + MinIndex: r.QueryOptions.MinQueryIndex, + } + // To calculate the cache key we only hash the node filters. The // datacenter is handled by the cache framework. The other fields are // not, but should not be used in any cache types. v, err := hashstructure.Hash(r.NodeMetaFilters, nil) - if err != nil { - // Empty string means do not cache. If we have an error we should - // just forward along to the server. - return "" + if err == nil { + // If there is an error, we don't set the key. A blank key forces + // no cache for this request so the request is forwarded directly + // to the server. + info.Key = strconv.FormatUint(v, 10) } - return strconv.FormatUint(v, 10) + return info } func (r *DCSpecificRequest) CacheMinIndex() uint64 {