agent/cache: Reorganize some files, RequestInfo struct, prepare for partitioning

This commit is contained in:
Mitchell Hashimoto 2018-04-08 15:08:34 +01:00
parent b0db5657c4
commit e3c1162881
No known key found for this signature in database
GPG Key ID: 744E147AA52F5B0A
11 changed files with 94 additions and 72 deletions

View File

@ -1,8 +1,9 @@
package cache package cachetype
import ( import (
"fmt" "fmt"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
) )
@ -11,8 +12,8 @@ type TypeCARoot struct {
RPC RPC RPC RPC
} }
func (c *TypeCARoot) Fetch(opts FetchOptions, req Request) (FetchResult, error) { func (c *TypeCARoot) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
var result FetchResult var result cache.FetchResult
// The request should be a DCSpecificRequest. // The request should be a DCSpecificRequest.
reqReal, ok := req.(*structs.DCSpecificRequest) reqReal, ok := req.(*structs.DCSpecificRequest)

View File

@ -1,9 +1,10 @@
package cache package cachetype
import ( import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -30,12 +31,12 @@ func TestTypeCARoot(t *testing.T) {
}) })
// Fetch // Fetch
result, err := typ.Fetch(FetchOptions{ result, err := typ.Fetch(cache.FetchOptions{
MinIndex: 24, MinIndex: 24,
Timeout: 1 * time.Second, Timeout: 1 * time.Second,
}, &structs.DCSpecificRequest{Datacenter: "dc1"}) }, &structs.DCSpecificRequest{Datacenter: "dc1"})
require.Nil(err) require.Nil(err)
require.Equal(FetchResult{ require.Equal(cache.FetchResult{
Value: resp, Value: resp,
Index: 48, Index: 48,
}, result) }, result)
@ -48,7 +49,8 @@ func TestTypeCARoot_badReqType(t *testing.T) {
typ := &TypeCARoot{RPC: rpc} typ := &TypeCARoot{RPC: rpc}
// Fetch // Fetch
_, err := typ.Fetch(FetchOptions{}, TestRequest(t, "foo", 64)) _, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
require.NotNil(err) require.NotNil(err)
require.Contains(err.Error(), "wrong type") require.Contains(err.Error(), "wrong type")

View File

@ -1,5 +1,5 @@
// Code generated by mockery v1.0.0 // Code generated by mockery v1.0.0
package cache package cachetype
import mock "github.com/stretchr/testify/mock" import mock "github.com/stretchr/testify/mock"

View File

@ -1,4 +1,6 @@
package cache package cachetype
//go:generate mockery -all -inpkg
// RPC is an interface that an RPC client must implement. This is a helper // RPC is an interface that an RPC client must implement. This is a helper
// interface that is implemented by the agent delegate so that Type // interface that is implemented by the agent delegate so that Type

View File

@ -0,0 +1,12 @@
package cachetype
import (
"github.com/mitchellh/go-testing-interface"
)
// TestRPC returns a mock implementation of the RPC interface.
func TestRPC(t testing.T) *MockRPC {
// This function is relatively useless but this allows us to perhaps
// perform some initialization later.
return &MockRPC{}
}

22
agent/cache/cache.go vendored
View File

@ -109,8 +109,8 @@ func (c *Cache) RegisterType(n string, typ Type, opts *RegisterOptions) {
// Multiple Get calls for the same Request (matching CacheKey value) will // Multiple Get calls for the same Request (matching CacheKey value) will
// block on a single network request. // block on a single network request.
func (c *Cache) Get(t string, r Request) (interface{}, error) { func (c *Cache) Get(t string, r Request) (interface{}, error) {
key := r.CacheKey() info := r.CacheInfo()
if key == "" { if info.Key == "" {
// If no key is specified, then we do not cache this request. // If no key is specified, then we do not cache this request.
// Pass directly through to the backend. // Pass directly through to the backend.
return c.fetchDirect(t, r) return c.fetchDirect(t, r)
@ -119,7 +119,7 @@ func (c *Cache) Get(t string, r Request) (interface{}, error) {
RETRY_GET: RETRY_GET:
// Get the current value // Get the current value
c.entriesLock.RLock() c.entriesLock.RLock()
entry, ok := c.entries[key] entry, ok := c.entries[info.Key]
c.entriesLock.RUnlock() c.entriesLock.RUnlock()
// If we have a current value and the index is greater than the // If we have a current value and the index is greater than the
@ -127,8 +127,7 @@ RETRY_GET:
// index is zero and we have something in the cache we accept whatever // index is zero and we have something in the cache we accept whatever
// we have. // we have.
if ok && entry.Valid { if ok && entry.Valid {
idx := r.CacheMinIndex() if info.MinIndex == 0 || info.MinIndex < entry.Index {
if idx == 0 || idx < entry.Index {
return entry.Value, nil return entry.Value, nil
} }
} }
@ -154,13 +153,12 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) {
return nil, fmt.Errorf("unknown type in cache: %s", t) return nil, fmt.Errorf("unknown type in cache: %s", t)
} }
// The cache key is used multiple times and might be dynamically // Grab the cache information while we're outside the lock.
// constructed so let's just store it once here. info := r.CacheInfo()
key := r.CacheKey()
c.entriesLock.Lock() c.entriesLock.Lock()
defer c.entriesLock.Unlock() defer c.entriesLock.Unlock()
entry, ok := c.entries[key] entry, ok := c.entries[info.Key]
// If we already have an entry and it is actively fetching, then return // If we already have an entry and it is actively fetching, then return
// the currently active waiter. // the currently active waiter.
@ -178,7 +176,7 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) {
// identical calls to fetch will return the same waiter rather than // identical calls to fetch will return the same waiter rather than
// perform multiple fetches. // perform multiple fetches.
entry.Fetching = true entry.Fetching = true
c.entries[key] = entry c.entries[info.Key] = entry
// The actual Fetch must be performed in a goroutine. // The actual Fetch must be performed in a goroutine.
go func() { go func() {
@ -199,7 +197,7 @@ func (c *Cache) fetch(t string, r Request) (<-chan struct{}, error) {
// Insert // Insert
c.entriesLock.Lock() c.entriesLock.Lock()
c.entries[key] = newEntry c.entries[info.Key] = newEntry
c.entriesLock.Unlock() c.entriesLock.Unlock()
// Trigger the waiter // Trigger the waiter
@ -227,7 +225,7 @@ func (c *Cache) fetchDirect(t string, r Request) (interface{}, error) {
// Fetch it with the min index specified directly by the request. // Fetch it with the min index specified directly by the request.
result, err := tEntry.Type.Fetch(FetchOptions{ result, err := tEntry.Type.Fetch(FetchOptions{
MinIndex: r.CacheMinIndex(), MinIndex: r.CacheInfo().MinIndex,
}, r) }, r)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -25,7 +25,7 @@ func TestCacheGet_noIndex(t *testing.T) {
typ.Static(FetchResult{Value: 42}, nil).Times(1) typ.Static(FetchResult{Value: 42}, nil).Times(1)
// Get, should fetch // Get, should fetch
req := TestRequest(t, "hello", 0) req := TestRequest(t, RequestInfo{Key: "hello"})
result, err := c.Get("t", req) result, err := c.Get("t", req)
require.Nil(err) require.Nil(err)
require.Equal(42, result) require.Equal(42, result)
@ -57,7 +57,7 @@ func TestCacheGet_blankCacheKey(t *testing.T) {
typ.Static(FetchResult{Value: 42}, nil).Times(2) typ.Static(FetchResult{Value: 42}, nil).Times(2)
// Get, should fetch // Get, should fetch
req := TestRequest(t, "", 0) req := TestRequest(t, RequestInfo{Key: ""})
result, err := c.Get("t", req) result, err := c.Get("t", req)
require.Nil(err) require.Nil(err)
require.Equal(42, result) require.Equal(42, result)
@ -87,8 +87,8 @@ func TestCacheGet_blockingInitSameKey(t *testing.T) {
typ.Static(FetchResult{Value: 42}, nil).WaitUntil(triggerCh).Times(1) typ.Static(FetchResult{Value: 42}, nil).WaitUntil(triggerCh).Times(1)
// Perform multiple gets // Perform multiple gets
getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
// They should block // They should block
select { select {
@ -131,12 +131,12 @@ func TestCacheGet_blockingInitDiffKeys(t *testing.T) {
Run(func(args mock.Arguments) { Run(func(args mock.Arguments) {
keysLock.Lock() keysLock.Lock()
defer keysLock.Unlock() defer keysLock.Unlock()
keys = append(keys, args.Get(1).(Request).CacheKey()) keys = append(keys, args.Get(1).(Request).CacheInfo().Key)
}) })
// Perform multiple gets // Perform multiple gets
getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) getCh1 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, "goodbye", 0)) getCh2 := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "goodbye"}))
// They should block // They should block
select { select {
@ -176,7 +176,8 @@ func TestCacheGet_blockingIndex(t *testing.T) {
typ.Static(FetchResult{Value: 42, Index: 6}, nil).WaitUntil(triggerCh) typ.Static(FetchResult{Value: 42, Index: 6}, nil).WaitUntil(triggerCh)
// Fetch should block // Fetch should block
resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 5)) resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{
Key: "hello", MinIndex: 5}))
// Should block // Should block
select { select {
@ -217,16 +218,16 @@ func TestCacheGet_periodicRefresh(t *testing.T) {
typ.Static(FetchResult{Value: 12, Index: 5}, nil).WaitUntil(triggerCh) typ.Static(FetchResult{Value: 12, Index: 5}, nil).WaitUntil(triggerCh)
// Fetch should block // Fetch should block
resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) resultCh := TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
TestCacheGetChResult(t, resultCh, 1) TestCacheGetChResult(t, resultCh, 1)
// Fetch again almost immediately should return old result // Fetch again almost immediately should return old result
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
TestCacheGetChResult(t, resultCh, 1) TestCacheGetChResult(t, resultCh, 1)
// Wait for the timer // Wait for the timer
time.Sleep(200 * time.Millisecond) time.Sleep(200 * time.Millisecond)
resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, "hello", 0)) resultCh = TestCacheGetCh(t, c, "t", TestRequest(t, RequestInfo{Key: "hello"}))
TestCacheGetChResult(t, resultCh, 12) TestCacheGetChResult(t, resultCh, 12)
} }

View File

@ -8,29 +8,15 @@ type MockRequest struct {
mock.Mock mock.Mock
} }
// CacheKey provides a mock function with given fields: // CacheInfo provides a mock function with given fields:
func (_m *MockRequest) CacheKey() string { func (_m *MockRequest) CacheInfo() RequestInfo {
ret := _m.Called() ret := _m.Called()
var r0 string var r0 RequestInfo
if rf, ok := ret.Get(0).(func() string); ok { if rf, ok := ret.Get(0).(func() RequestInfo); ok {
r0 = rf() r0 = rf()
} else { } else {
r0 = ret.Get(0).(string) r0 = ret.Get(0).(RequestInfo)
}
return r0
}
// CacheMinIndex provides a mock function with given fields:
func (_m *MockRequest) CacheMinIndex() uint64 {
ret := _m.Called()
var r0 uint64
if rf, ok := ret.Get(0).(func() uint64); ok {
r0 = rf()
} else {
r0 = ret.Get(0).(uint64)
} }
return r0 return r0

View File

@ -5,13 +5,35 @@ package cache
// This interface is typically implemented by request structures in // This interface is typically implemented by request structures in
// the agent/structs package. // the agent/structs package.
type Request interface { type Request interface {
// CacheKey is a unique cache key for this request. This key should // CacheInfo returns information used for caching this request.
CacheInfo() RequestInfo
}
// RequestInfo represents cache information for a request. The caching
// framework uses this to control the behavior of caching and to determine
// cacheability.
type RequestInfo struct {
// Key is a unique cache key for this request. This key should
// absolutely uniquely identify this request, since any conflicting // absolutely uniquely identify this request, since any conflicting
// cache keys could result in invalid data being returned from the cache. // cache keys could result in invalid data being returned from the cache.
CacheKey() string Key string
// CacheMinIndex is the minimum index being queried. This is used to // Token is the ACL token associated with this request.
//
// Datacenter is the datacenter that the request is targeting.
//
// Both of these values are used to partition the cache. The cache framework
// today partitions data on these values to simplify behavior: by
// partitioning ACL tokens, the cache doesn't need to be smart about
// filtering results. By filtering datacenter results, the cache can
// service the multi-DC nature of Consul. This comes at the expense of
// working set size, but in general the effect is minimal.
Token string
Datacenter string
// MinIndex is the minimum index being queried. This is used to
// determine if we already have data satisfying the query or if we need // determine if we already have data satisfying the query or if we need
// to block until new data is available. // to block until new data is available. If no index is available, the
CacheMinIndex() uint64 // default value (zero) is acceptable.
MinIndex uint64
} }

View File

@ -50,20 +50,12 @@ func TestCacheGetChResult(t testing.T, ch <-chan interface{}, expected interface
// TestRequest returns a Request that returns the given cache key and index. // TestRequest returns a Request that returns the given cache key and index.
// The Reset method can be called to reset it for custom usage. // The Reset method can be called to reset it for custom usage.
func TestRequest(t testing.T, key string, index uint64) *MockRequest { func TestRequest(t testing.T, info RequestInfo) *MockRequest {
req := &MockRequest{} req := &MockRequest{}
req.On("CacheKey").Return(key) req.On("CacheInfo").Return(info)
req.On("CacheMinIndex").Return(index)
return req return req
} }
// TestRPC returns a mock implementation of the RPC interface.
func TestRPC(t testing.T) *MockRPC {
// This function is relatively useless but this allows us to perhaps
// perform some initialization later.
return &MockRPC{}
}
// TestType returns a MockType that can be used to setup expectations // TestType returns a MockType that can be used to setup expectations
// on data fetching. // on data fetching.
func TestType(t testing.T) *MockType { func TestType(t testing.T) *MockType {

View File

@ -10,6 +10,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types" "github.com/hashicorp/consul/types"
"github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-msgpack/codec"
@ -278,18 +279,23 @@ func (r *DCSpecificRequest) RequestDatacenter() string {
return r.Datacenter return r.Datacenter
} }
func (r *DCSpecificRequest) CacheKey() string { func (r *DCSpecificRequest) CacheInfo() cache.RequestInfo {
info := cache.RequestInfo{
MinIndex: r.QueryOptions.MinQueryIndex,
}
// To calculate the cache key we only hash the node filters. The // To calculate the cache key we only hash the node filters. The
// datacenter is handled by the cache framework. The other fields are // datacenter is handled by the cache framework. The other fields are
// not, but should not be used in any cache types. // not, but should not be used in any cache types.
v, err := hashstructure.Hash(r.NodeMetaFilters, nil) v, err := hashstructure.Hash(r.NodeMetaFilters, nil)
if err != nil { if err == nil {
// Empty string means do not cache. If we have an error we should // If there is an error, we don't set the key. A blank key forces
// just forward along to the server. // no cache for this request so the request is forwarded directly
return "" // to the server.
info.Key = strconv.FormatUint(v, 10)
} }
return strconv.FormatUint(v, 10) return info
} }
func (r *DCSpecificRequest) CacheMinIndex() uint64 { func (r *DCSpecificRequest) CacheMinIndex() uint64 {