diff --git a/agent/agent.go b/agent/agent.go index b6c0286e76..47561c12fa 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -3738,6 +3738,20 @@ func (a *Agent) registerCache() { RefreshTimer: 0 * time.Second, RefreshTimeout: 10 * time.Minute, }) + + a.cache.RegisterType(cachetype.CatalogListServicesName, &cachetype.CatalogListServices{ + RPC: a, + }, &cache.RegisterOptions{ + Refresh: true, + RefreshTimer: 0 * time.Second, + RefreshTimeout: 10 * time.Minute, + }) + + a.cache.RegisterType(cachetype.CatalogDatacentersName, &cachetype.CatalogDatacenters{ + RPC: a, + }, &cache.RegisterOptions{ + Refresh: false, + }) } // defaultProxyCommand returns the default Connect managed proxy command. diff --git a/agent/cache-types/catalog_datacenters.go b/agent/cache-types/catalog_datacenters.go new file mode 100644 index 0000000000..60c2b27b47 --- /dev/null +++ b/agent/cache-types/catalog_datacenters.go @@ -0,0 +1,77 @@ +package cachetype + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" +) + +// Recommended name for registration. +const CatalogDatacentersName = "catalog-datacenters" + +// Datacenters supports fetching discovering all the known datacenters +type CatalogDatacenters struct { + RPC RPC +} + +func (c *CatalogDatacenters) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + var result cache.FetchResult + + // The request should be a CatalogDatacentersRequest. + reqReal, ok := req.(*structs.DatacentersRequest) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Allways allow stale - there's no point in hitting leader if the request is + // going to be served from cache and endup arbitrarily stale anyway. This + // allows cached service-discover to automatically read scale across all + // servers too. + reqReal.AllowStale = true + + // Fetch + var reply []string + if err := c.RPC.RPC("Catalog.ListDatacenters", reqReal, &reply); err != nil { + return result, err + } + + result.Value = &reply + + // this is a purely synthetic index to keep the caching happy. + if opts.LastResult != nil { + equal := true + previousDCs, ok := opts.LastResult.Value.(*[]string) + if ok && previousDCs == nil { + ok = false + } + + if ok { + if len(reply) != len(*previousDCs) { + equal = false + } else { + // ordering matters as they should be sorted based on distance + for i, dc := range reply { + if dc != (*previousDCs)[i] { + equal = false + break + } + } + } + } + + result.Index = opts.LastResult.Index + if !equal || !ok { + result.Index += 1 + } + } else { + result.Index = 1 + } + + return result, nil +} + +func (c *CatalogDatacenters) SupportsBlocking() bool { + return false +} diff --git a/agent/cache-types/catalog_datacenters_test.go b/agent/cache-types/catalog_datacenters_test.go new file mode 100644 index 0000000000..fdae222902 --- /dev/null +++ b/agent/cache-types/catalog_datacenters_test.go @@ -0,0 +1,96 @@ +package cachetype + +import ( + "testing" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestCatalogDatacenters(t *testing.T) { + rpc := TestRPC(t) + typ := &CatalogDatacenters{RPC: rpc} + + // Expect the proper RPC call. This also sets the expected value + // since that is return-by-pointer in the arguments. + var resp *[]string + var resp2 *[]string + var resp3 *[]string + rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*structs.DatacentersRequest) + require.True(t, req.AllowStale) + + reply := args.Get(2).(*[]string) + *reply = []string{ + "primary", "secondary", "tertiary", + } + resp = reply + }) + rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*structs.DatacentersRequest) + require.True(t, req.AllowStale) + + reply := args.Get(2).(*[]string) + *reply = []string{ + "primary", "tertiary", "secondary", + } + resp2 = reply + }) + rpc.On("RPC", "Catalog.ListDatacenters", mock.Anything, mock.Anything).Once().Return(nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*structs.DatacentersRequest) + require.True(t, req.AllowStale) + + reply := args.Get(2).(*[]string) + *reply = []string{ + "primary", "secondary", + } + resp3 = reply + }) + + // Fetch first time + result, err := typ.Fetch(cache.FetchOptions{}, &structs.DatacentersRequest{}) + result2, err := typ.Fetch(cache.FetchOptions{LastResult: &result}, &structs.DatacentersRequest{QueryOptions: structs.QueryOptions{MustRevalidate: true}}) + result3, err := typ.Fetch(cache.FetchOptions{LastResult: &result2}, &structs.DatacentersRequest{QueryOptions: structs.QueryOptions{MustRevalidate: true}}) + + // make sure it was called the right number of times + rpc.AssertExpectations(t) + + // make sure the first result was correct + require.NoError(t, err) + require.Equal(t, result, cache.FetchResult{ + Value: resp, + Index: 1, + }) + + // validate the second result + require.NoError(t, err) + require.Equal(t, result2, cache.FetchResult{ + Value: resp2, + Index: 2, + }) + + // validate the third result + require.NoError(t, err) + require.Equal(t, result3, cache.FetchResult{ + Value: resp3, + Index: 3, + }) + +} + +func TestDatacenters_badReqType(t *testing.T) { + rpc := TestRPC(t) + typ := &PreparedQuery{RPC: rpc} + + // Fetch + _, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest( + t, cache.RequestInfo{Key: "foo", MinIndex: 64})) + require.Error(t, err) + require.Contains(t, err.Error(), "wrong type") + rpc.AssertExpectations(t) +} diff --git a/agent/cache-types/catalog_list_services.go b/agent/cache-types/catalog_list_services.go new file mode 100644 index 0000000000..0c4c4e43d1 --- /dev/null +++ b/agent/cache-types/catalog_list_services.go @@ -0,0 +1,51 @@ +package cachetype + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" +) + +// Recommended name for registration. +const CatalogListServicesName = "catalog-list-services" + +// CatalogListServices supports fetching discovering service names via the catalog. +type CatalogListServices struct { + RPC RPC +} + +func (c *CatalogListServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + var result cache.FetchResult + + // The request should be a DCSpecificRequest. + reqReal, ok := req.(*structs.DCSpecificRequest) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Set the minimum query index to our current index so we block + reqReal.QueryOptions.MinQueryIndex = opts.MinIndex + reqReal.QueryOptions.MaxQueryTime = opts.Timeout + + // Always allow stale - there's no point in hitting leader if the request is + // going to be served from cache and end up arbitrarily stale anyway. This + // allows cached service-discover to automatically read scale across all + // servers too. + reqReal.AllowStale = true + + // Fetch + var reply structs.IndexedServices + if err := c.RPC.RPC("Catalog.ListServices", reqReal, &reply); err != nil { + return result, err + } + + result.Value = &reply + result.Index = reply.QueryMeta.Index + return result, nil +} + +func (c *CatalogListServices) SupportsBlocking() bool { + return true +} diff --git a/agent/cache-types/catalog_list_services_test.go b/agent/cache-types/catalog_list_services_test.go new file mode 100644 index 0000000000..0f0e78c760 --- /dev/null +++ b/agent/cache-types/catalog_list_services_test.go @@ -0,0 +1,62 @@ +package cachetype + +import ( + "testing" + "time" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestCatalogListServices(t *testing.T) { + rpc := TestRPC(t) + typ := &CatalogListServices{RPC: rpc} + + // Expect the proper RPC call. This also sets the expected value + // since that is return-by-pointer in the arguments. + var resp *structs.IndexedServices + rpc.On("RPC", "Catalog.ListServices", mock.Anything, mock.Anything).Return(nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*structs.DCSpecificRequest) + require.Equal(t, uint64(24), req.QueryOptions.MinQueryIndex) + require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime) + require.True(t, req.AllowStale) + + reply := args.Get(2).(*structs.IndexedServices) + reply.Services = map[string][]string{ + "foo": []string{"prod", "linux"}, + "bar": []string{"qa", "windows"}, + } + reply.QueryMeta.Index = 48 + resp = reply + }) + + // Fetch + resultA, err := typ.Fetch(cache.FetchOptions{ + MinIndex: 24, + Timeout: 1 * time.Second, + }, &structs.DCSpecificRequest{ + Datacenter: "dc1", + }) + require.NoError(t, err) + require.Equal(t, cache.FetchResult{ + Value: resp, + Index: 48, + }, resultA) + + rpc.AssertExpectations(t) +} + +func TestCatalogListServices_badReqType(t *testing.T) { + rpc := TestRPC(t) + typ := &CatalogServices{RPC: rpc} + + // Fetch + _, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest( + t, cache.RequestInfo{Key: "foo", MinIndex: 64})) + require.Error(t, err) + require.Contains(t, err.Error(), "wrong type") + rpc.AssertExpectations(t) +} diff --git a/agent/catalog_endpoint.go b/agent/catalog_endpoint.go index 6fd649d1cf..cf40a9f0cf 100644 --- a/agent/catalog_endpoint.go +++ b/agent/catalog_endpoint.go @@ -74,12 +74,33 @@ func (s *HTTPServer) CatalogDatacenters(resp http.ResponseWriter, req *http.Requ metrics.IncrCounterWithLabels([]string{"client", "api", "catalog_datacenters"}, 1, []metrics.Label{{Name: "node", Value: s.nodeName()}}) + args := structs.DatacentersRequest{} + s.parseConsistency(resp, req, &args.QueryOptions) + parseCacheControl(resp, req, &args.QueryOptions) var out []string - if err := s.agent.RPC("Catalog.ListDatacenters", struct{}{}, &out); err != nil { - metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_datacenters"}, 1, - []metrics.Label{{Name: "node", Value: s.nodeName()}}) - return nil, err + + if args.QueryOptions.UseCache { + raw, m, err := s.agent.cache.Get(cachetype.CatalogDatacentersName, &args) + if err != nil { + metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_datacenters"}, 1, + []metrics.Label{{Name: "node", Value: s.nodeName()}}) + return nil, err + } + reply, ok := raw.(*[]string) + if !ok { + // This should never happen, but we want to protect against panics + return nil, fmt.Errorf("internal error: response type not correct") + } + defer setCacheMeta(resp, &m) + out = *reply + } else { + if err := s.agent.RPC("Catalog.ListDatacenters", &args, &out); err != nil { + metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_datacenters"}, 1, + []metrics.Label{{Name: "node", Value: s.nodeName()}}) + return nil, err + } } + metrics.IncrCounterWithLabels([]string{"client", "api", "success", "catalog_datacenters"}, 1, []metrics.Label{{Name: "node", Value: s.nodeName()}}) return out, nil @@ -133,20 +154,37 @@ func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done { return nil, nil } - var out structs.IndexedServices defer setMeta(resp, &out.QueryMeta) -RETRY_ONCE: - if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil { - metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1, - []metrics.Label{{Name: "node", Value: s.nodeName()}}) - return nil, err - } - if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { - args.AllowStale = false - args.MaxStaleDuration = 0 - goto RETRY_ONCE + + if args.QueryOptions.UseCache { + raw, m, err := s.agent.cache.Get(cachetype.CatalogListServicesName, &args) + if err != nil { + metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1, + []metrics.Label{{Name: "node", Value: s.nodeName()}}) + return nil, err + } + reply, ok := raw.(*structs.IndexedServices) + if !ok { + // This should never happen, but we want to protect against panics + return nil, fmt.Errorf("internal error: response type not correct") + } + defer setCacheMeta(resp, &m) + out = *reply + } else { + RETRY_ONCE: + if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil { + metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1, + []metrics.Label{{Name: "node", Value: s.nodeName()}}) + return nil, err + } + if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact { + args.AllowStale = false + args.MaxStaleDuration = 0 + goto RETRY_ONCE + } } + out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel() // Use empty map instead of nil diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index 53f8f257ec..c2d43263ba 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -200,7 +200,7 @@ func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) e } // ListDatacenters is used to query for the list of known datacenters -func (c *Catalog) ListDatacenters(args *struct{}, reply *[]string) error { +func (c *Catalog) ListDatacenters(args *structs.DatacentersRequest, reply *[]string) error { dcs, err := c.srv.router.GetDatacentersByDistance() if err != nil { return err diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 4df742d801..37a1671a75 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -321,6 +321,22 @@ type QuerySource struct { Ip string } +type DatacentersRequest struct { + QueryOptions +} + +func (r *DatacentersRequest) CacheInfo() cache.RequestInfo { + return cache.RequestInfo{ + Token: "", + Datacenter: "", + MinIndex: 0, + Timeout: r.MaxQueryTime, + MaxAge: r.MaxAge, + MustRevalidate: r.MustRevalidate, + Key: "catalog-datacenters", // must not be empty for cache to work + } +} + // DCSpecificRequest is used to query about a specific DC type DCSpecificRequest struct { Datacenter string