diff --git a/agent/agent.go b/agent/agent.go index 35403a1453..dd7c40399c 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -20,11 +20,11 @@ import ( "google.golang.org/grpc" - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/ae" "github.com/hashicorp/consul/agent/cache" - "github.com/hashicorp/consul/agent/cache-types" + cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/checks" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" @@ -42,8 +42,8 @@ import ( "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/types" "github.com/hashicorp/consul/watch" - "github.com/hashicorp/go-multierror" - "github.com/hashicorp/go-uuid" + multierror "github.com/hashicorp/go-multierror" + uuid "github.com/hashicorp/go-uuid" "github.com/hashicorp/memberlist" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" @@ -3468,6 +3468,15 @@ func (a *Agent) registerCache() { // Prepared queries don't support blocking Refresh: false, }) + + a.cache.RegisterType(cachetype.NodeServicesName, &cachetype.NodeServices{ + RPC: a, + }, &cache.RegisterOptions{ + // Maintain a blocking query, retry dropped connections quickly + Refresh: true, + RefreshTimer: 0 * time.Second, + RefreshTimeout: 10 * time.Minute, + }) } // defaultProxyCommand returns the default Connect managed proxy command. diff --git a/agent/cache-types/node_services.go b/agent/cache-types/node_services.go new file mode 100644 index 0000000000..09491e60b2 --- /dev/null +++ b/agent/cache-types/node_services.go @@ -0,0 +1,52 @@ +package cachetype + +import ( + "fmt" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" +) + +// Recommended name for registration. +const NodeServicesName = "node-services" + +// NodeServices supports fetching discovering service instances via the +// catalog. +type NodeServices struct { + RPC RPC +} + +func (c *NodeServices) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) { + var result cache.FetchResult + + // The request should be a DCSpecificRequest. + reqReal, ok := req.(*structs.NodeSpecificRequest) + if !ok { + return result, fmt.Errorf( + "Internal cache failure: request wrong type: %T", req) + } + + // Set the minimum query index to our current index so we block + reqReal.QueryOptions.MinQueryIndex = opts.MinIndex + reqReal.QueryOptions.MaxQueryTime = opts.Timeout + + // Allways allow stale - there's no point in hitting leader if the request is + // going to be served from cache and endup arbitrarily stale anyway. This + // allows cached service-discover to automatically read scale across all + // servers too. + reqReal.AllowStale = true + + // Fetch + var reply structs.IndexedNodeServices + if err := c.RPC.RPC("Catalog.NodeServices", reqReal, &reply); err != nil { + return result, err + } + + result.Value = &reply + result.Index = reply.QueryMeta.Index + return result, nil +} + +func (c *NodeServices) SupportsBlocking() bool { + return true +} diff --git a/agent/cache-types/node_services_test.go b/agent/cache-types/node_services_test.go new file mode 100644 index 0000000000..09acee940d --- /dev/null +++ b/agent/cache-types/node_services_test.go @@ -0,0 +1,71 @@ +package cachetype + +import ( + "testing" + "time" + + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestNodeServices(t *testing.T) { + require := require.New(t) + rpc := TestRPC(t) + defer rpc.AssertExpectations(t) + typ := &NodeServices{RPC: rpc} + + // Expect the proper RPC call. This also sets the expected value + // since that is return-by-pointer in the arguments. + var resp *structs.IndexedNodeServices + rpc.On("RPC", "Catalog.NodeServices", mock.Anything, mock.Anything).Return(nil). + Run(func(args mock.Arguments) { + req := args.Get(1).(*structs.NodeSpecificRequest) + require.Equal(uint64(24), req.QueryOptions.MinQueryIndex) + require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime) + require.Equal("node-01", req.Node) + require.True(req.AllowStale) + + reply := args.Get(2).(*structs.IndexedNodeServices) + reply.NodeServices = &structs.NodeServices{ + Node: &structs.Node{ + ID: "abcdef", + Node: "node-01", + Address: "127.0.0.5", + Datacenter: "dc1", + }, + } + + reply.QueryMeta.Index = 48 + resp = reply + }) + + // Fetch + resultA, err := typ.Fetch(cache.FetchOptions{ + MinIndex: 24, + Timeout: 1 * time.Second, + }, &structs.NodeSpecificRequest{ + Datacenter: "dc1", + Node: "node-01", + }) + require.NoError(err) + require.Equal(cache.FetchResult{ + Value: resp, + Index: 48, + }, resultA) +} + +func TestNodeServices_badReqType(t *testing.T) { + require := require.New(t) + rpc := TestRPC(t) + defer rpc.AssertExpectations(t) + typ := &NodeServices{RPC: rpc} + + // Fetch + _, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest( + t, cache.RequestInfo{Key: "foo", MinIndex: 64})) + require.Error(err) + require.Contains(err.Error(), "wrong type") + +} diff --git a/agent/config/builder.go b/agent/config/builder.go index c240b4552b..c29851205f 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -715,6 +715,8 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) { DNSSOA: soa, DNSUDPAnswerLimit: b.intVal(c.DNS.UDPAnswerLimit), DNSNodeMetaTXT: b.boolValWithDefault(c.DNS.NodeMetaTXT, true), + DNSUseCache: b.boolVal(c.DNS.UseCache), + DNSCacheMaxAge: b.durationVal("dns_config.cache_max_age", c.DNS.CacheMaxAge), // HTTP HTTPPort: httpPort, diff --git a/agent/config/config.go b/agent/config/config.go index f70cdb1e50..82e6f37175 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -555,6 +555,8 @@ type DNS struct { UDPAnswerLimit *int `json:"udp_answer_limit,omitempty" hcl:"udp_answer_limit" mapstructure:"udp_answer_limit"` NodeMetaTXT *bool `json:"enable_additional_node_meta_txt,omitempty" hcl:"enable_additional_node_meta_txt" mapstructure:"enable_additional_node_meta_txt"` SOA *SOA `json:"soa,omitempty" hcl:"soa" mapstructure:"soa"` + UseCache *bool `json:"use_cache,omitempty" hcl:"use_cache" mapstructure:"use_cache"` + CacheMaxAge *string `json:"cache_max_age,omitempty" hcl:"cache_max_age" mapstructure:"cache_max_age"` } type HTTPConfig struct { diff --git a/agent/config/runtime.go b/agent/config/runtime.go index c22ed7c95f..0978064f4f 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -314,6 +314,16 @@ type RuntimeConfig struct { // flag: -recursor string [-recursor string] DNSRecursors []string + // DNSUseCache wether or not to use cache for dns queries + // + // hcl: dns_config { use_cache = (true|false) } + DNSUseCache bool + + // DNSUseCache wether or not to use cache for dns queries + // + // hcl: dns_config { cache_max_age = "duration" } + DNSCacheMaxAge time.Duration + // HTTPBlockEndpoints is a list of endpoint prefixes to block in the // HTTP API. Any requests to these will get a 403 response. // diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index d898bedb06..8518281459 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -3066,7 +3066,9 @@ func TestFullConfig(t *testing.T) { "service_ttl": { "*": "32030s" }, - "udp_answer_limit": 29909 + "udp_answer_limit": 29909, + "use_cache": true, + "cache_max_age": "5m" }, "enable_acl_replication": true, "enable_agent_tls_for_checks": true, @@ -3620,6 +3622,8 @@ func TestFullConfig(t *testing.T) { "*" = "32030s" } udp_answer_limit = 29909 + use_cache = true + cache_max_age = "5m" } enable_acl_replication = true enable_agent_tls_for_checks = true @@ -4249,6 +4253,8 @@ func TestFullConfig(t *testing.T) { DNSServiceTTL: map[string]time.Duration{"*": 32030 * time.Second}, DNSUDPAnswerLimit: 29909, DNSNodeMetaTXT: true, + DNSUseCache: true, + DNSCacheMaxAge: 5 * time.Minute, DataDir: dataDir, Datacenter: "rzo029wg", DevMode: true, @@ -5043,6 +5049,8 @@ func TestSanitize(t *testing.T) { "Minttl": 0 }, "DNSUDPAnswerLimit": 0, + "DNSUseCache": false, + "DNSCacheMaxAge": "0s", "DataDir": "", "Datacenter": "", "DevMode": false, diff --git a/agent/dns.go b/agent/dns.go index 0b34a77bba..83b8b0999a 100644 --- a/agent/dns.go +++ b/agent/dns.go @@ -11,9 +11,10 @@ import ( "regexp" - "github.com/armon/go-metrics" - "github.com/armon/go-radix" + metrics "github.com/armon/go-metrics" + radix "github.com/armon/go-radix" "github.com/coredns/coredns/plugin/pkg/dnsutil" + cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/config" "github.com/hashicorp/consul/agent/consul" "github.com/hashicorp/consul/agent/structs" @@ -53,6 +54,8 @@ type dnsConfig struct { Datacenter string EnableTruncate bool MaxStale time.Duration + UseCache bool + CacheMaxAge time.Duration NodeName string NodeTTL time.Duration OnlyPassing bool @@ -140,6 +143,8 @@ func GetDNSConfig(conf *config.RuntimeConfig) *dnsConfig { ServiceTTL: conf.DNSServiceTTL, UDPAnswerLimit: conf.DNSUDPAnswerLimit, NodeMetaTXT: conf.DNSNodeMetaTXT, + UseCache: conf.DNSUseCache, + CacheMaxAge: conf.DNSCacheMaxAge, dnsSOAConfig: dnsSOAConfig{ Expire: conf.DNSSOA.Expire, Minttl: conf.DNSSOA.Minttl, @@ -647,7 +652,7 @@ func (d *DNSServer) nodeLookup(network, datacenter, node string, req, resp *dns. } // Make an RPC request - args := structs.NodeSpecificRequest{ + args := &structs.NodeSpecificRequest{ Datacenter: datacenter, Node: node, QueryOptions: structs.QueryOptions{ @@ -655,25 +660,13 @@ func (d *DNSServer) nodeLookup(network, datacenter, node string, req, resp *dns. AllowStale: d.config.AllowStale, }, } - var out structs.IndexedNodeServices -RPC: - if err := d.agent.RPC("Catalog.NodeServices", &args, &out); err != nil { + out, err := d.lookupNode(args) + if err != nil { d.logger.Printf("[ERR] dns: rpc error: %v", err) resp.SetRcode(req, dns.RcodeServerFailure) return } - // Verify that request is not too stale, redo the request - if args.AllowStale { - if out.LastContact > d.config.MaxStale { - args.AllowStale = false - d.logger.Printf("[WARN] dns: Query results too stale, re-requesting") - goto RPC - } else if out.LastContact > staleCounterThreshold { - metrics.IncrCounter([]string{"dns", "stale_queries"}, 1) - } - } - // If we have no address, return not found! if out.NodeServices == nil { d.addSOA(resp) @@ -705,6 +698,43 @@ RPC: } } +func (d *DNSServer) lookupNode(args *structs.NodeSpecificRequest) (*structs.IndexedNodeServices, error) { + var out structs.IndexedNodeServices + + useCache := d.config.UseCache +RPC: + if useCache { + raw, _, err := d.agent.cache.Get(cachetype.NodeServicesName, args) + if err != nil { + return nil, err + } + reply, ok := raw.(*structs.IndexedNodeServices) + if !ok { + // This should never happen, but we want to protect against panics + return nil, fmt.Errorf("internal error: response type not correct") + } + out = *reply + } else { + if err := d.agent.RPC("Catalog.NodeServices", &args, &out); err != nil { + return nil, err + } + } + + // Verify that request is not too stale, redo the request + if args.AllowStale { + if out.LastContact > d.config.MaxStale { + args.AllowStale = false + useCache = false + d.logger.Printf("[WARN] dns: Query results too stale, re-requesting") + goto RPC + } else if out.LastContact > staleCounterThreshold { + metrics.IncrCounter([]string{"dns", "stale_queries"}, 1) + } + } + + return &out, nil +} + // encodeKVasRFC1464 encodes a key-value pair according to RFC1464 func encodeKVasRFC1464(key, value string) (txt string) { // For details on these replacements c.f. https://www.ietf.org/rfc/rfc1464.txt @@ -1030,12 +1060,29 @@ func (d *DNSServer) lookupServiceNodes(datacenter, service, tag string, connect QueryOptions: structs.QueryOptions{ Token: d.agent.tokens.UserToken(), AllowStale: d.config.AllowStale, + MaxAge: d.config.CacheMaxAge, }, } var out structs.IndexedCheckServiceNodes - if err := d.agent.RPC("Health.ServiceNodes", &args, &out); err != nil { - return structs.IndexedCheckServiceNodes{}, err + + if d.config.UseCache { + raw, m, err := d.agent.cache.Get(cachetype.HealthServicesName, &args) + if err != nil { + return out, err + } + reply, ok := raw.(*structs.IndexedCheckServiceNodes) + if !ok { + // This should never happen, but we want to protect against panics + return out, fmt.Errorf("internal error: response type not correct") + } + d.logger.Printf("[TRACE] dns: cache hit: %v for service %s", m.Hit, service) + + out = *reply + } else { + if err := d.agent.RPC("Health.ServiceNodes", &args, &out); err != nil { + return out, err + } } if args.AllowStale && out.LastContact > staleCounterThreshold { @@ -1122,6 +1169,7 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, remot QueryOptions: structs.QueryOptions{ Token: d.agent.tokens.UserToken(), AllowStale: d.config.AllowStale, + MaxAge: d.config.CacheMaxAge, }, // Always pass the local agent through. In the DNS interface, there @@ -1150,6 +1198,20 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, remot } } + out, err := d.lookupPreparedQuery(args) + + // If they give a bogus query name, treat that as a name error, + // not a full on server error. We have to use a string compare + // here since the RPC layer loses the type information. + if err != nil && err.Error() == consul.ErrQueryNotFound.Error() { + d.addSOA(resp) + resp.SetRcode(req, dns.RcodeNameError) + return + } else if err != nil { + resp.SetRcode(req, dns.RcodeServerFailure) + return + } + // TODO (slackpad) - What's a safe limit we can set here? It seems like // with dup filtering done at this level we need to get everything to // match the previous behavior. We can optimize by pushing more filtering @@ -1158,34 +1220,6 @@ func (d *DNSServer) preparedQueryLookup(network, datacenter, query string, remot // likely work in practice, like 10*maxUDPAnswerLimit which should help // reduce bandwidth if there are thousands of nodes available. - var out structs.PreparedQueryExecuteResponse -RPC: - if err := d.agent.RPC("PreparedQuery.Execute", &args, &out); err != nil { - // If they give a bogus query name, treat that as a name error, - // not a full on server error. We have to use a string compare - // here since the RPC layer loses the type information. - if err.Error() == consul.ErrQueryNotFound.Error() { - d.addSOA(resp) - resp.SetRcode(req, dns.RcodeNameError) - return - } - - d.logger.Printf("[ERR] dns: rpc error: %v", err) - resp.SetRcode(req, dns.RcodeServerFailure) - return - } - - // Verify that request is not too stale, redo the request. - if args.AllowStale { - if out.LastContact > d.config.MaxStale { - args.AllowStale = false - d.logger.Printf("[WARN] dns: Query results too stale, re-requesting") - goto RPC - } else if out.LastContact > staleCounterThreshold { - metrics.IncrCounter([]string{"dns", "stale_queries"}, 1) - } - } - // Determine the TTL. The parse should never fail since we vet it when // the query is created, but we check anyway. If the query didn't // specify a TTL then we will try to use the agent's service-specific @@ -1225,6 +1259,44 @@ RPC: } } +func (d *DNSServer) lookupPreparedQuery(args structs.PreparedQueryExecuteRequest) (*structs.PreparedQueryExecuteResponse, error) { + var out structs.PreparedQueryExecuteResponse + +RPC: + if d.config.UseCache { + raw, m, err := d.agent.cache.Get(cachetype.PreparedQueryName, &args) + if err != nil { + return nil, err + } + reply, ok := raw.(*structs.PreparedQueryExecuteResponse) + if !ok { + // This should never happen, but we want to protect against panics + return nil, err + } + + d.logger.Printf("[TRACE] dns: cache hit: %v for prepared query %s", m.Hit, args.QueryIDOrName) + + out = *reply + } else { + if err := d.agent.RPC("PreparedQuery.Execute", &args, &out); err != nil { + return nil, err + } + } + + // Verify that request is not too stale, redo the request. + if args.AllowStale { + if out.LastContact > d.config.MaxStale { + args.AllowStale = false + d.logger.Printf("[WARN] dns: Query results too stale, re-requesting") + goto RPC + } else if out.LastContact > staleCounterThreshold { + metrics.IncrCounter([]string{"dns", "stale_queries"}, 1) + } + } + + return &out, nil +} + // serviceNodeRecords is used to add the node records for a service lookup func (d *DNSServer) serviceNodeRecords(dc string, nodes structs.CheckServiceNodes, req, resp *dns.Msg, ttl time.Duration, maxRecursionLevel int) { qName := req.Question[0].Name diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 349e1ae426..702a403aa7 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -16,7 +16,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/types" "github.com/hashicorp/go-msgpack/codec" - "github.com/hashicorp/go-multierror" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/serf/coordinate" "github.com/mitchellh/hashstructure" ) @@ -431,6 +431,29 @@ func (r *NodeSpecificRequest) RequestDatacenter() string { return r.Datacenter } +func (r *NodeSpecificRequest) CacheInfo() cache.RequestInfo { + info := cache.RequestInfo{ + Token: r.Token, + Datacenter: r.Datacenter, + MinIndex: r.MinQueryIndex, + Timeout: r.MaxQueryTime, + MaxAge: r.MaxAge, + MustRevalidate: r.MustRevalidate, + } + + v, err := hashstructure.Hash([]interface{}{ + r.Node, + }, nil) + if err == nil { + // If there is an error, we don't set the key. A blank key forces + // no cache for this request so the request is forwarded directly + // to the server. + info.Key = strconv.FormatUint(v, 10) + } + + return info +} + // ChecksInStateRequest is used to query for nodes in a state type ChecksInStateRequest struct { Datacenter string diff --git a/website/source/docs/agent/options.html.md b/website/source/docs/agent/options.html.md index 3fb84f5342..c7a41b462e 100644 --- a/website/source/docs/agent/options.html.md +++ b/website/source/docs/agent/options.html.md @@ -1072,6 +1072,12 @@ default will automatically work with some tooling. Configures the Retry duration expressed in seconds, default value is 600, ie: 10 minutes. + * `use_cache` - When set to true, DNS resolution will use the agent cache described + in [agent caching](/api/index.html#agent-caching). This setting affects all service and prepared queries DNS requests. Implies [`allow_stale`](#allow_stale) + + * `dns_max_age` - When [use_cache](#dns_use_cache) is enabled, the agent + will attempt to re-fetch the result from the servers if the cached value is older than this duration. See: [agent caching](/api/index.html#agent-caching). + * `domain` Equivalent to the [`-domain` command-line flag](#_domain).