mirror of
https://github.com/status-im/consul.git
synced 2025-01-26 21:51:39 +00:00
New Cache Types (#5995)
* Add a cache type for the Catalog.ListServices endpoint * Add a cache type for the Catalog.ListDatacenters endpoint
This commit is contained in:
parent
19e70c46bf
commit
43c5ba0304
@ -3738,6 +3738,20 @@ func (a *Agent) registerCache() {
|
||||
RefreshTimer: 0 * time.Second,
|
||||
RefreshTimeout: 10 * time.Minute,
|
||||
})
|
||||
|
||||
a.cache.RegisterType(cachetype.CatalogListServicesName, &cachetype.CatalogListServices{
|
||||
RPC: a,
|
||||
}, &cache.RegisterOptions{
|
||||
Refresh: true,
|
||||
RefreshTimer: 0 * time.Second,
|
||||
RefreshTimeout: 10 * time.Minute,
|
||||
})
|
||||
|
||||
a.cache.RegisterType(cachetype.CatalogDatacentersName, &cachetype.CatalogDatacenters{
|
||||
RPC: a,
|
||||
}, &cache.RegisterOptions{
|
||||
Refresh: false,
|
||||
})
|
||||
}
|
||||
|
||||
// defaultProxyCommand returns the default Connect managed proxy command.
|
||||
|
77
agent/cache-types/catalog_datacenters.go
Normal file
77
agent/cache-types/catalog_datacenters.go
Normal file
@ -0,0 +1,77 @@
|
||||
package cachetype
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// Recommended name for registration.
|
||||
const CatalogDatacentersName = "catalog-datacenters"
|
||||
|
||||
// Datacenters supports fetching discovering all the known datacenters
|
||||
type CatalogDatacenters struct {
|
||||
RPC RPC
|
||||
}
|
||||
|
||||
func (c *CatalogDatacenters) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
||||
var result cache.FetchResult
|
||||
|
||||
// The request should be a CatalogDatacentersRequest.
|
||||
reqReal, ok := req.(*structs.DatacentersRequest)
|
||||
if !ok {
|
||||
return result, fmt.Errorf(
|
||||
"Internal cache failure: request wrong type: %T", req)
|
||||
}
|
||||
|
||||
// Allways allow stale - there's no point in hitting leader if the request is
|
||||
// going to be served from cache and endup arbitrarily stale anyway. This
|
||||
// allows cached service-discover to automatically read scale across all
|
||||
// servers too.
|
||||
reqReal.AllowStale = true
|
||||
|
||||
// Fetch
|
||||
var reply []string
|
||||
if err := c.RPC.RPC("Catalog.ListDatacenters", reqReal, &reply); err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
result.Value = &reply
|
||||
|
||||
// this is a purely synthetic index to keep the caching happy.
|
||||
if opts.LastResult != nil {
|
||||
equal := true
|
||||
previousDCs, ok := opts.LastResult.Value.(*[]string)
|
||||
if ok && previousDCs == nil {
|
||||
ok = false
|
||||
}
|
||||
|
||||
if ok {
|
||||
if len(reply) != len(*previousDCs) {
|
||||
equal = false
|
||||
} else {
|
||||
// ordering matters as they should be sorted based on distance
|
||||
for i, dc := range reply {
|
||||
if dc != (*previousDCs)[i] {
|
||||
equal = false
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result.Index = opts.LastResult.Index
|
||||
if !equal || !ok {
|
||||
result.Index += 1
|
||||
}
|
||||
} else {
|
||||
result.Index = 1
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *CatalogDatacenters) SupportsBlocking() bool {
|
||||
return false
|
||||
}
|
96
agent/cache-types/catalog_datacenters_test.go
Normal file
96
agent/cache-types/catalog_datacenters_test.go
Normal file
@ -0,0 +1,96 @@
|
||||
package cachetype
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCatalogDatacenters(t *testing.T) {
|
||||
rpc := TestRPC(t)
|
||||
typ := &CatalogDatacenters{RPC: rpc}
|
||||
|
||||
// Expect the proper RPC call. This also sets the expected value
|
||||
// since that is return-by-pointer in the arguments.
|
||||
var resp *[]string
|
||||
var resp2 *[]string
|
||||
var resp3 *[]string
|
||||
rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
|
||||
Run(func(args mock.Arguments) {
|
||||
req := args.Get(1).(*structs.DatacentersRequest)
|
||||
require.True(t, req.AllowStale)
|
||||
|
||||
reply := args.Get(2).(*[]string)
|
||||
*reply = []string{
|
||||
"primary", "secondary", "tertiary",
|
||||
}
|
||||
resp = reply
|
||||
})
|
||||
rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
|
||||
Run(func(args mock.Arguments) {
|
||||
req := args.Get(1).(*structs.DatacentersRequest)
|
||||
require.True(t, req.AllowStale)
|
||||
|
||||
reply := args.Get(2).(*[]string)
|
||||
*reply = []string{
|
||||
"primary", "tertiary", "secondary",
|
||||
}
|
||||
resp2 = reply
|
||||
})
|
||||
rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil).
|
||||
Run(func(args mock.Arguments) {
|
||||
req := args.Get(1).(*structs.DatacentersRequest)
|
||||
require.True(t, req.AllowStale)
|
||||
|
||||
reply := args.Get(2).(*[]string)
|
||||
*reply = []string{
|
||||
"primary", "secondary",
|
||||
}
|
||||
resp3 = reply
|
||||
})
|
||||
|
||||
// Fetch first time
|
||||
result, err := typ.Fetch(cache.FetchOptions{}, &structs.DatacentersRequest{})
|
||||
result2, err := typ.Fetch(cache.FetchOptions{LastResult: &result}, &structs.DatacentersRequest{QueryOptions: structs.QueryOptions{MustRevalidate: true}})
|
||||
result3, err := typ.Fetch(cache.FetchOptions{LastResult: &result2}, &structs.DatacentersRequest{QueryOptions: structs.QueryOptions{MustRevalidate: true}})
|
||||
|
||||
// make sure it was called the right number of times
|
||||
rpc.AssertExpectations(t)
|
||||
|
||||
// make sure the first result was correct
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, result, cache.FetchResult{
|
||||
Value: resp,
|
||||
Index: 1,
|
||||
})
|
||||
|
||||
// validate the second result
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, result2, cache.FetchResult{
|
||||
Value: resp2,
|
||||
Index: 2,
|
||||
})
|
||||
|
||||
// validate the third result
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, result3, cache.FetchResult{
|
||||
Value: resp3,
|
||||
Index: 3,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestDatacenters_badReqType(t *testing.T) {
|
||||
rpc := TestRPC(t)
|
||||
typ := &PreparedQuery{RPC: rpc}
|
||||
|
||||
// Fetch
|
||||
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
|
||||
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "wrong type")
|
||||
rpc.AssertExpectations(t)
|
||||
}
|
51
agent/cache-types/catalog_list_services.go
Normal file
51
agent/cache-types/catalog_list_services.go
Normal file
@ -0,0 +1,51 @@
|
||||
package cachetype
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// Recommended name for registration.
|
||||
const CatalogListServicesName = "catalog-list-services"
|
||||
|
||||
// CatalogListServices supports fetching discovering service names via the catalog.
|
||||
type CatalogListServices struct {
|
||||
RPC RPC
|
||||
}
|
||||
|
||||
func (c *CatalogListServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
||||
var result cache.FetchResult
|
||||
|
||||
// The request should be a DCSpecificRequest.
|
||||
reqReal, ok := req.(*structs.DCSpecificRequest)
|
||||
if !ok {
|
||||
return result, fmt.Errorf(
|
||||
"Internal cache failure: request wrong type: %T", req)
|
||||
}
|
||||
|
||||
// Set the minimum query index to our current index so we block
|
||||
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
|
||||
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
|
||||
|
||||
// Always allow stale - there's no point in hitting leader if the request is
|
||||
// going to be served from cache and end up arbitrarily stale anyway. This
|
||||
// allows cached service-discover to automatically read scale across all
|
||||
// servers too.
|
||||
reqReal.AllowStale = true
|
||||
|
||||
// Fetch
|
||||
var reply structs.IndexedServices
|
||||
if err := c.RPC.RPC("Catalog.ListServices", reqReal, &reply); err != nil {
|
||||
return result, err
|
||||
}
|
||||
|
||||
result.Value = &reply
|
||||
result.Index = reply.QueryMeta.Index
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *CatalogListServices) SupportsBlocking() bool {
|
||||
return true
|
||||
}
|
62
agent/cache-types/catalog_list_services_test.go
Normal file
62
agent/cache-types/catalog_list_services_test.go
Normal file
@ -0,0 +1,62 @@
|
||||
package cachetype
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/cache"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestCatalogListServices(t *testing.T) {
|
||||
rpc := TestRPC(t)
|
||||
typ := &CatalogListServices{RPC: rpc}
|
||||
|
||||
// Expect the proper RPC call. This also sets the expected value
|
||||
// since that is return-by-pointer in the arguments.
|
||||
var resp *structs.IndexedServices
|
||||
rpc.On("RPC", "Catalog.ListServices", mock.Anything, mock.Anything).Return(nil).
|
||||
Run(func(args mock.Arguments) {
|
||||
req := args.Get(1).(*structs.DCSpecificRequest)
|
||||
require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex)
|
||||
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
|
||||
require.True(t, req.AllowStale)
|
||||
|
||||
reply := args.Get(2).(*structs.IndexedServices)
|
||||
reply.Services = map[string][]string{
|
||||
"foo": []string{"prod", "linux"},
|
||||
"bar": []string{"qa", "windows"},
|
||||
}
|
||||
reply.QueryMeta.Index = 48
|
||||
resp = reply
|
||||
})
|
||||
|
||||
// Fetch
|
||||
resultA, err := typ.Fetch(cache.FetchOptions{
|
||||
MinIndex: 24,
|
||||
Timeout: 1 * time.Second,
|
||||
}, &structs.DCSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, cache.FetchResult{
|
||||
Value: resp,
|
||||
Index: 48,
|
||||
}, resultA)
|
||||
|
||||
rpc.AssertExpectations(t)
|
||||
}
|
||||
|
||||
func TestCatalogListServices_badReqType(t *testing.T) {
|
||||
rpc := TestRPC(t)
|
||||
typ := &CatalogServices{RPC: rpc}
|
||||
|
||||
// Fetch
|
||||
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
|
||||
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "wrong type")
|
||||
rpc.AssertExpectations(t)
|
||||
}
|
@ -74,12 +74,33 @@ func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Requ
|
||||
metrics.IncrCounterWithLabels([]string{"client", "api", "catalog_datacenters"}, 1,
|
||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||
|
||||
args := structs.DatacentersRequest{}
|
||||
s.parseConsistency(resp, req, &args.QueryOptions)
|
||||
parseCacheControl(resp, req, &args.QueryOptions)
|
||||
var out []string
|
||||
if err := s.agent.RPC("Catalog.ListDatacenters", struct{}{}, &out); err != nil {
|
||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_datacenters"}, 1,
|
||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||
return nil, err
|
||||
|
||||
if args.QueryOptions.UseCache {
|
||||
raw, m, err := s.agent.cache.Get(cachetype.CatalogDatacentersName, &args)
|
||||
if err != nil {
|
||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_datacenters"}, 1,
|
||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||
return nil, err
|
||||
}
|
||||
reply, ok := raw.(*[]string)
|
||||
if !ok {
|
||||
// This should never happen, but we want to protect against panics
|
||||
return nil, fmt.Errorf("internal error: response type not correct")
|
||||
}
|
||||
defer setCacheMeta(resp, &m)
|
||||
out = *reply
|
||||
} else {
|
||||
if err := s.agent.RPC("Catalog.ListDatacenters", &args, &out); err != nil {
|
||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_datacenters"}, 1,
|
||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
metrics.IncrCounterWithLabels([]string{"client", "api", "success", "catalog_datacenters"}, 1,
|
||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||
return out, nil
|
||||
@ -133,20 +154,37 @@ func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
var out structs.IndexedServices
|
||||
defer setMeta(resp, &out.QueryMeta)
|
||||
RETRY_ONCE:
|
||||
if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil {
|
||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1,
|
||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||
return nil, err
|
||||
}
|
||||
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
|
||||
args.AllowStale = false
|
||||
args.MaxStaleDuration = 0
|
||||
goto RETRY_ONCE
|
||||
|
||||
if args.QueryOptions.UseCache {
|
||||
raw, m, err := s.agent.cache.Get(cachetype.CatalogListServicesName, &args)
|
||||
if err != nil {
|
||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1,
|
||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||
return nil, err
|
||||
}
|
||||
reply, ok := raw.(*structs.IndexedServices)
|
||||
if !ok {
|
||||
// This should never happen, but we want to protect against panics
|
||||
return nil, fmt.Errorf("internal error: response type not correct")
|
||||
}
|
||||
defer setCacheMeta(resp, &m)
|
||||
out = *reply
|
||||
} else {
|
||||
RETRY_ONCE:
|
||||
if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil {
|
||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1,
|
||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||
return nil, err
|
||||
}
|
||||
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
|
||||
args.AllowStale = false
|
||||
args.MaxStaleDuration = 0
|
||||
goto RETRY_ONCE
|
||||
}
|
||||
}
|
||||
|
||||
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
|
||||
|
||||
// Use empty map instead of nil
|
||||
|
@ -200,7 +200,7 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e
|
||||
}
|
||||
|
||||
// ListDatacenters is used to query for the list of known datacenters
|
||||
func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error {
|
||||
func (c *Catalog) ListDatacenters(args *structs.DatacentersRequest, reply *[]string) error {
|
||||
dcs, err := c.srv.router.GetDatacentersByDistance()
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -321,6 +321,22 @@ type QuerySource struct {
|
||||
Ip string
|
||||
}
|
||||
|
||||
type DatacentersRequest struct {
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
func (r *DatacentersRequest) CacheInfo() cache.RequestInfo {
|
||||
return cache.RequestInfo{
|
||||
Token: "",
|
||||
Datacenter: "",
|
||||
MinIndex: 0,
|
||||
Timeout: r.MaxQueryTime,
|
||||
MaxAge: r.MaxAge,
|
||||
MustRevalidate: r.MustRevalidate,
|
||||
Key: "catalog-datacenters", // must not be empty for cache to work
|
||||
}
|
||||
}
|
||||
|
||||
// DCSpecificRequest is used to query about a specific DC
|
||||
type DCSpecificRequest struct {
|
||||
Datacenter string
|
||||
|
Loading…
x
Reference in New Issue
Block a user