mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 22:06:20 +00:00
Adds discovery_max_stale (#4004)
Adds a new option to allow service discovery endpoints to return stale results if configured at the agent level.
This commit is contained in:
parent
2df780f040
commit
a67d27c756
@ -100,9 +100,17 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (
|
|||||||
|
|
||||||
var out structs.IndexedNodes
|
var out structs.IndexedNodes
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
RETRY_ONCE:
|
||||||
if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil {
|
if err := s.agent.RPC("Catalog.ListNodes", &args, &out); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
|
||||||
|
args.AllowStale = false
|
||||||
|
args.MaxStaleDuration = 0
|
||||||
|
goto RETRY_ONCE
|
||||||
|
}
|
||||||
|
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
|
||||||
|
|
||||||
s.agent.TranslateAddresses(args.Datacenter, out.Nodes)
|
s.agent.TranslateAddresses(args.Datacenter, out.Nodes)
|
||||||
|
|
||||||
// Use empty list instead of nil
|
// Use empty list instead of nil
|
||||||
@ -127,11 +135,18 @@ func (s *HTTPServer) CatalogServices(resp http.ResponseWriter, req *http.Request
|
|||||||
|
|
||||||
var out structs.IndexedServices
|
var out structs.IndexedServices
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
RETRY_ONCE:
|
||||||
if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil {
|
if err := s.agent.RPC("Catalog.ListServices", &args, &out); err != nil {
|
||||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1,
|
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_services"}, 1,
|
||||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
|
||||||
|
args.AllowStale = false
|
||||||
|
args.MaxStaleDuration = 0
|
||||||
|
goto RETRY_ONCE
|
||||||
|
}
|
||||||
|
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
|
||||||
|
|
||||||
// Use empty map instead of nil
|
// Use empty map instead of nil
|
||||||
if out.Services == nil {
|
if out.Services == nil {
|
||||||
@ -172,11 +187,18 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req
|
|||||||
// Make the RPC request
|
// Make the RPC request
|
||||||
var out structs.IndexedServiceNodes
|
var out structs.IndexedServiceNodes
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
RETRY_ONCE:
|
||||||
if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil {
|
if err := s.agent.RPC("Catalog.ServiceNodes", &args, &out); err != nil {
|
||||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_service_nodes"}, 1,
|
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_service_nodes"}, 1,
|
||||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
|
||||||
|
args.AllowStale = false
|
||||||
|
args.MaxStaleDuration = 0
|
||||||
|
goto RETRY_ONCE
|
||||||
|
}
|
||||||
|
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
|
||||||
s.agent.TranslateAddresses(args.Datacenter, out.ServiceNodes)
|
s.agent.TranslateAddresses(args.Datacenter, out.ServiceNodes)
|
||||||
|
|
||||||
// Use empty list instead of nil
|
// Use empty list instead of nil
|
||||||
@ -216,11 +238,18 @@ func (s *HTTPServer) CatalogNodeServices(resp http.ResponseWriter, req *http.Req
|
|||||||
// Make the RPC request
|
// Make the RPC request
|
||||||
var out structs.IndexedNodeServices
|
var out structs.IndexedNodeServices
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
RETRY_ONCE:
|
||||||
if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil {
|
if err := s.agent.RPC("Catalog.NodeServices", &args, &out); err != nil {
|
||||||
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_node_services"}, 1,
|
metrics.IncrCounterWithLabels([]string{"client", "rpc", "error", "catalog_node_services"}, 1,
|
||||||
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
[]metrics.Label{{Name: "node", Value: s.nodeName()}})
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
|
||||||
|
args.AllowStale = false
|
||||||
|
args.MaxStaleDuration = 0
|
||||||
|
goto RETRY_ONCE
|
||||||
|
}
|
||||||
|
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
|
||||||
if out.NodeServices != nil && out.NodeServices.Node != nil {
|
if out.NodeServices != nil && out.NodeServices.Node != nil {
|
||||||
s.agent.TranslateAddresses(args.Datacenter, out.NodeServices.Node)
|
s.agent.TranslateAddresses(args.Datacenter, out.NodeServices.Node)
|
||||||
}
|
}
|
||||||
|
@ -655,6 +655,7 @@ func (b *Builder) Build() (rt RuntimeConfig, err error) {
|
|||||||
DisableRemoteExec: b.boolVal(c.DisableRemoteExec),
|
DisableRemoteExec: b.boolVal(c.DisableRemoteExec),
|
||||||
DisableUpdateCheck: b.boolVal(c.DisableUpdateCheck),
|
DisableUpdateCheck: b.boolVal(c.DisableUpdateCheck),
|
||||||
DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput),
|
DiscardCheckOutput: b.boolVal(c.DiscardCheckOutput),
|
||||||
|
DiscoveryMaxStale: b.durationVal("discovery_max_stale", c.DiscoveryMaxStale),
|
||||||
EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks),
|
EnableAgentTLSForChecks: b.boolVal(c.EnableAgentTLSForChecks),
|
||||||
EnableDebug: b.boolVal(c.EnableDebug),
|
EnableDebug: b.boolVal(c.EnableDebug),
|
||||||
EnableScriptChecks: b.boolVal(c.EnableScriptChecks),
|
EnableScriptChecks: b.boolVal(c.EnableScriptChecks),
|
||||||
|
@ -174,6 +174,7 @@ type Config struct {
|
|||||||
DisableRemoteExec *bool `json:"disable_remote_exec,omitempty" hcl:"disable_remote_exec" mapstructure:"disable_remote_exec"`
|
DisableRemoteExec *bool `json:"disable_remote_exec,omitempty" hcl:"disable_remote_exec" mapstructure:"disable_remote_exec"`
|
||||||
DisableUpdateCheck *bool `json:"disable_update_check,omitempty" hcl:"disable_update_check" mapstructure:"disable_update_check"`
|
DisableUpdateCheck *bool `json:"disable_update_check,omitempty" hcl:"disable_update_check" mapstructure:"disable_update_check"`
|
||||||
DiscardCheckOutput *bool `json:"discard_check_output" hcl:"discard_check_output" mapstructure:"discard_check_output"`
|
DiscardCheckOutput *bool `json:"discard_check_output" hcl:"discard_check_output" mapstructure:"discard_check_output"`
|
||||||
|
DiscoveryMaxStale *string `json:"discovery_max_stale" hcl:"discovery_max_stale" mapstructure:"discovery_max_stale"`
|
||||||
EnableACLReplication *bool `json:"enable_acl_replication,omitempty" hcl:"enable_acl_replication" mapstructure:"enable_acl_replication"`
|
EnableACLReplication *bool `json:"enable_acl_replication,omitempty" hcl:"enable_acl_replication" mapstructure:"enable_acl_replication"`
|
||||||
EnableAgentTLSForChecks *bool `json:"enable_agent_tls_for_checks,omitempty" hcl:"enable_agent_tls_for_checks" mapstructure:"enable_agent_tls_for_checks"`
|
EnableAgentTLSForChecks *bool `json:"enable_agent_tls_for_checks,omitempty" hcl:"enable_agent_tls_for_checks" mapstructure:"enable_agent_tls_for_checks"`
|
||||||
EnableDebug *bool `json:"enable_debug,omitempty" hcl:"enable_debug" mapstructure:"enable_debug"`
|
EnableDebug *bool `json:"enable_debug,omitempty" hcl:"enable_debug" mapstructure:"enable_debug"`
|
||||||
|
@ -473,6 +473,14 @@ type RuntimeConfig struct {
|
|||||||
// flag: -datacenter string
|
// flag: -datacenter string
|
||||||
Datacenter string
|
Datacenter string
|
||||||
|
|
||||||
|
// Defines the maximum stale value for discovery path. Defauls to "0s".
|
||||||
|
// Discovery paths are /v1/heath/ paths
|
||||||
|
//
|
||||||
|
// If not set to 0, it will try to perform stale read and perform only a
|
||||||
|
// consistent read whenever the value is too old.
|
||||||
|
// hcl: discovery_max_stale = "duration"
|
||||||
|
DiscoveryMaxStale time.Duration
|
||||||
|
|
||||||
// Node name is the name we use to advertise. Defaults to hostname.
|
// Node name is the name we use to advertise. Defaults to hostname.
|
||||||
//
|
//
|
||||||
// NodeName is exposed via /v1/agent/self from here and
|
// NodeName is exposed via /v1/agent/self from here and
|
||||||
|
@ -2304,6 +2304,7 @@ func TestFullConfig(t *testing.T) {
|
|||||||
"disable_remote_exec": true,
|
"disable_remote_exec": true,
|
||||||
"disable_update_check": true,
|
"disable_update_check": true,
|
||||||
"discard_check_output": true,
|
"discard_check_output": true,
|
||||||
|
"discovery_max_stale": "5s",
|
||||||
"domain": "7W1xXSqd",
|
"domain": "7W1xXSqd",
|
||||||
"dns_config": {
|
"dns_config": {
|
||||||
"allow_stale": true,
|
"allow_stale": true,
|
||||||
@ -2740,6 +2741,7 @@ func TestFullConfig(t *testing.T) {
|
|||||||
disable_remote_exec = true
|
disable_remote_exec = true
|
||||||
disable_update_check = true
|
disable_update_check = true
|
||||||
discard_check_output = true
|
discard_check_output = true
|
||||||
|
discovery_max_stale = "5s"
|
||||||
domain = "7W1xXSqd"
|
domain = "7W1xXSqd"
|
||||||
dns_config {
|
dns_config {
|
||||||
allow_stale = true
|
allow_stale = true
|
||||||
@ -3067,6 +3069,7 @@ func TestFullConfig(t *testing.T) {
|
|||||||
"ae_interval": "10003s",
|
"ae_interval": "10003s",
|
||||||
"check_deregister_interval_min": "27870s",
|
"check_deregister_interval_min": "27870s",
|
||||||
"check_reap_interval": "10662s",
|
"check_reap_interval": "10662s",
|
||||||
|
"discovery_max_stale": "5s",
|
||||||
"segment_limit": 24705,
|
"segment_limit": 24705,
|
||||||
"segment_name_limit": 27046,
|
"segment_name_limit": 27046,
|
||||||
"sync_coordinate_interval_min": "27983s",
|
"sync_coordinate_interval_min": "27983s",
|
||||||
@ -3121,6 +3124,7 @@ func TestFullConfig(t *testing.T) {
|
|||||||
ae_interval = "10003s"
|
ae_interval = "10003s"
|
||||||
check_deregister_interval_min = "27870s"
|
check_deregister_interval_min = "27870s"
|
||||||
check_reap_interval = "10662s"
|
check_reap_interval = "10662s"
|
||||||
|
discovery_max_stale = "5s"
|
||||||
segment_limit = 24705
|
segment_limit = 24705
|
||||||
segment_name_limit = 27046
|
segment_name_limit = 27046
|
||||||
sync_coordinate_interval_min = "27983s"
|
sync_coordinate_interval_min = "27983s"
|
||||||
@ -3327,6 +3331,7 @@ func TestFullConfig(t *testing.T) {
|
|||||||
DisableRemoteExec: true,
|
DisableRemoteExec: true,
|
||||||
DisableUpdateCheck: true,
|
DisableUpdateCheck: true,
|
||||||
DiscardCheckOutput: true,
|
DiscardCheckOutput: true,
|
||||||
|
DiscoveryMaxStale: 5 * time.Second,
|
||||||
EnableACLReplication: true,
|
EnableACLReplication: true,
|
||||||
EnableAgentTLSForChecks: true,
|
EnableAgentTLSForChecks: true,
|
||||||
EnableDebug: true,
|
EnableDebug: true,
|
||||||
@ -4008,6 +4013,7 @@ func TestSanitize(t *testing.T) {
|
|||||||
"DisableRemoteExec": false,
|
"DisableRemoteExec": false,
|
||||||
"DisableUpdateCheck": false,
|
"DisableUpdateCheck": false,
|
||||||
"DiscardCheckOutput": false,
|
"DiscardCheckOutput": false,
|
||||||
|
"DiscoveryMaxStale": "0s",
|
||||||
"EnableACLReplication": false,
|
"EnableACLReplication": false,
|
||||||
"EnableAgentTLSForChecks": false,
|
"EnableAgentTLSForChecks": false,
|
||||||
"EnableDebug": false,
|
"EnableDebug": false,
|
||||||
|
@ -30,9 +30,16 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req
|
|||||||
// Make the RPC request
|
// Make the RPC request
|
||||||
var out structs.IndexedHealthChecks
|
var out structs.IndexedHealthChecks
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
RETRY_ONCE:
|
||||||
if err := s.agent.RPC("Health.ChecksInState", &args, &out); err != nil {
|
if err := s.agent.RPC("Health.ChecksInState", &args, &out); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
|
||||||
|
args.AllowStale = false
|
||||||
|
args.MaxStaleDuration = 0
|
||||||
|
goto RETRY_ONCE
|
||||||
|
}
|
||||||
|
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
|
||||||
|
|
||||||
// Use empty list instead of nil
|
// Use empty list instead of nil
|
||||||
if out.HealthChecks == nil {
|
if out.HealthChecks == nil {
|
||||||
@ -66,9 +73,16 @@ func (s *HTTPServer) HealthNodeChecks(resp http.ResponseWriter, req *http.Reques
|
|||||||
// Make the RPC request
|
// Make the RPC request
|
||||||
var out structs.IndexedHealthChecks
|
var out structs.IndexedHealthChecks
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
RETRY_ONCE:
|
||||||
if err := s.agent.RPC("Health.NodeChecks", &args, &out); err != nil {
|
if err := s.agent.RPC("Health.NodeChecks", &args, &out); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
|
||||||
|
args.AllowStale = false
|
||||||
|
args.MaxStaleDuration = 0
|
||||||
|
goto RETRY_ONCE
|
||||||
|
}
|
||||||
|
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
|
||||||
|
|
||||||
// Use empty list instead of nil
|
// Use empty list instead of nil
|
||||||
if out.HealthChecks == nil {
|
if out.HealthChecks == nil {
|
||||||
@ -104,9 +118,16 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req
|
|||||||
// Make the RPC request
|
// Make the RPC request
|
||||||
var out structs.IndexedHealthChecks
|
var out structs.IndexedHealthChecks
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
RETRY_ONCE:
|
||||||
if err := s.agent.RPC("Health.ServiceChecks", &args, &out); err != nil {
|
if err := s.agent.RPC("Health.ServiceChecks", &args, &out); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
|
||||||
|
args.AllowStale = false
|
||||||
|
args.MaxStaleDuration = 0
|
||||||
|
goto RETRY_ONCE
|
||||||
|
}
|
||||||
|
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
|
||||||
|
|
||||||
// Use empty list instead of nil
|
// Use empty list instead of nil
|
||||||
if out.HealthChecks == nil {
|
if out.HealthChecks == nil {
|
||||||
@ -149,9 +170,16 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ
|
|||||||
// Make the RPC request
|
// Make the RPC request
|
||||||
var out structs.IndexedCheckServiceNodes
|
var out structs.IndexedCheckServiceNodes
|
||||||
defer setMeta(resp, &out.QueryMeta)
|
defer setMeta(resp, &out.QueryMeta)
|
||||||
|
RETRY_ONCE:
|
||||||
if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
|
if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
|
||||||
|
args.AllowStale = false
|
||||||
|
args.MaxStaleDuration = 0
|
||||||
|
goto RETRY_ONCE
|
||||||
|
}
|
||||||
|
out.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
|
||||||
|
|
||||||
// Filter to only passing if specified
|
// Filter to only passing if specified
|
||||||
if _, ok := params[api.HealthPassing]; ok {
|
if _, ok := params[api.HealthPassing]; ok {
|
||||||
|
@ -366,6 +366,12 @@ func setKnownLeader(resp http.ResponseWriter, known bool) {
|
|||||||
resp.Header().Set("X-Consul-KnownLeader", s)
|
resp.Header().Set("X-Consul-KnownLeader", s)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func setConsistency(resp http.ResponseWriter, consistency string) {
|
||||||
|
if consistency != "" {
|
||||||
|
resp.Header().Set("X-Consul-Effective-Consistency", consistency)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// setLastContact is used to set the last contact header
|
// setLastContact is used to set the last contact header
|
||||||
func setLastContact(resp http.ResponseWriter, last time.Duration) {
|
func setLastContact(resp http.ResponseWriter, last time.Duration) {
|
||||||
if last < 0 {
|
if last < 0 {
|
||||||
@ -380,6 +386,7 @@ func setMeta(resp http.ResponseWriter, m *structs.QueryMeta) {
|
|||||||
setIndex(resp, m.Index)
|
setIndex(resp, m.Index)
|
||||||
setLastContact(resp, m.LastContact)
|
setLastContact(resp, m.LastContact)
|
||||||
setKnownLeader(resp, m.KnownLeader)
|
setKnownLeader(resp, m.KnownLeader)
|
||||||
|
setConsistency(resp, m.ConsistencyLevel)
|
||||||
}
|
}
|
||||||
|
|
||||||
// setHeaders is used to set canonical response header fields
|
// setHeaders is used to set canonical response header fields
|
||||||
@ -416,13 +423,42 @@ func parseWait(resp http.ResponseWriter, req *http.Request, b *structs.QueryOpti
|
|||||||
|
|
||||||
// parseConsistency is used to parse the ?stale and ?consistent query params.
|
// parseConsistency is used to parse the ?stale and ?consistent query params.
|
||||||
// Returns true on error
|
// Returns true on error
|
||||||
func parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool {
|
func (s *HTTPServer) parseConsistency(resp http.ResponseWriter, req *http.Request, b *structs.QueryOptions) bool {
|
||||||
query := req.URL.Query()
|
query := req.URL.Query()
|
||||||
|
defaults := true
|
||||||
if _, ok := query["stale"]; ok {
|
if _, ok := query["stale"]; ok {
|
||||||
b.AllowStale = true
|
b.AllowStale = true
|
||||||
|
defaults = false
|
||||||
}
|
}
|
||||||
if _, ok := query["consistent"]; ok {
|
if _, ok := query["consistent"]; ok {
|
||||||
b.RequireConsistent = true
|
b.RequireConsistent = true
|
||||||
|
defaults = false
|
||||||
|
}
|
||||||
|
if _, ok := query["leader"]; ok {
|
||||||
|
defaults = false
|
||||||
|
}
|
||||||
|
if maxStale := query.Get("max_stale"); maxStale != "" {
|
||||||
|
dur, err := time.ParseDuration(maxStale)
|
||||||
|
if err != nil {
|
||||||
|
resp.WriteHeader(http.StatusBadRequest)
|
||||||
|
fmt.Fprintf(resp, "Invalid max_stale value %q", maxStale)
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
b.MaxStaleDuration = dur
|
||||||
|
if dur.Nanoseconds() > 0 {
|
||||||
|
b.AllowStale = true
|
||||||
|
defaults = false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// No specific Consistency has been specified by caller
|
||||||
|
if defaults {
|
||||||
|
path := req.URL.Path
|
||||||
|
if strings.HasPrefix(path, "/v1/catalog") || strings.HasPrefix(path, "/v1/health") {
|
||||||
|
if s.agent.config.DiscoveryMaxStale.Nanoseconds() > 0 {
|
||||||
|
b.MaxStaleDuration = s.agent.config.DiscoveryMaxStale
|
||||||
|
b.AllowStale = true
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if b.AllowStale && b.RequireConsistent {
|
if b.AllowStale && b.RequireConsistent {
|
||||||
resp.WriteHeader(http.StatusBadRequest)
|
resp.WriteHeader(http.StatusBadRequest)
|
||||||
@ -490,7 +526,7 @@ func (s *HTTPServer) parseMetaFilter(req *http.Request) map[string]string {
|
|||||||
func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool {
|
func (s *HTTPServer) parse(resp http.ResponseWriter, req *http.Request, dc *string, b *structs.QueryOptions) bool {
|
||||||
s.parseDC(req, dc)
|
s.parseDC(req, dc)
|
||||||
s.parseToken(req, &b.Token)
|
s.parseToken(req, &b.Token)
|
||||||
if parseConsistency(resp, req, b) {
|
if s.parseConsistency(resp, req, b) {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
return parseWait(resp, req, b)
|
return parseWait(resp, req, b)
|
||||||
|
@ -607,7 +607,9 @@ func TestParseConsistency(t *testing.T) {
|
|||||||
var b structs.QueryOptions
|
var b structs.QueryOptions
|
||||||
|
|
||||||
req, _ := http.NewRequest("GET", "/v1/catalog/nodes?stale", nil)
|
req, _ := http.NewRequest("GET", "/v1/catalog/nodes?stale", nil)
|
||||||
if d := parseConsistency(resp, req, &b); d {
|
a := NewTestAgent(t.Name(), "")
|
||||||
|
defer a.Shutdown()
|
||||||
|
if d := a.srv.parseConsistency(resp, req, &b); d {
|
||||||
t.Fatalf("unexpected done")
|
t.Fatalf("unexpected done")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -620,7 +622,7 @@ func TestParseConsistency(t *testing.T) {
|
|||||||
|
|
||||||
b = structs.QueryOptions{}
|
b = structs.QueryOptions{}
|
||||||
req, _ = http.NewRequest("GET", "/v1/catalog/nodes?consistent", nil)
|
req, _ = http.NewRequest("GET", "/v1/catalog/nodes?consistent", nil)
|
||||||
if d := parseConsistency(resp, req, &b); d {
|
if d := a.srv.parseConsistency(resp, req, &b); d {
|
||||||
t.Fatalf("unexpected done")
|
t.Fatalf("unexpected done")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -632,13 +634,70 @@ func TestParseConsistency(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ensureConsistency check if consistency modes are correctly applied
|
||||||
|
// if maxStale < 0 => stale, without MaxStaleDuration
|
||||||
|
// if maxStale == 0 => no stale
|
||||||
|
// if maxStale > 0 => stale + check duration
|
||||||
|
func ensureConsistency(t *testing.T, a *TestAgent, path string, maxStale time.Duration, requireConsistent bool) {
|
||||||
|
t.Helper()
|
||||||
|
req, _ := http.NewRequest("GET", path, nil)
|
||||||
|
var b structs.QueryOptions
|
||||||
|
resp := httptest.NewRecorder()
|
||||||
|
if d := a.srv.parseConsistency(resp, req, &b); d {
|
||||||
|
t.Fatalf("unexpected done")
|
||||||
|
}
|
||||||
|
allowStale := maxStale.Nanoseconds() != 0
|
||||||
|
if b.AllowStale != allowStale {
|
||||||
|
t.Fatalf("Bad Allow Stale")
|
||||||
|
}
|
||||||
|
if maxStale > 0 && b.MaxStaleDuration != maxStale {
|
||||||
|
t.Fatalf("Bad MaxStaleDuration: %d VS expected %d", b.MaxStaleDuration, maxStale)
|
||||||
|
}
|
||||||
|
if b.RequireConsistent != requireConsistent {
|
||||||
|
t.Fatal("Bad Consistent")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestParseConsistencyAndMaxStale(t *testing.T) {
|
||||||
|
a := NewTestAgent(t.Name(), "")
|
||||||
|
defer a.Shutdown()
|
||||||
|
|
||||||
|
// Default => Consistent
|
||||||
|
a.config.DiscoveryMaxStale = time.Duration(0)
|
||||||
|
ensureConsistency(t, a, "/v1/catalog/nodes", 0, false)
|
||||||
|
// Stale, without MaxStale
|
||||||
|
ensureConsistency(t, a, "/v1/catalog/nodes?stale", -1, false)
|
||||||
|
// Override explicitly
|
||||||
|
ensureConsistency(t, a, "/v1/catalog/nodes?max_stale=3s", 3*time.Second, false)
|
||||||
|
ensureConsistency(t, a, "/v1/catalog/nodes?stale&max_stale=3s", 3*time.Second, false)
|
||||||
|
|
||||||
|
// stale by defaul on discovery
|
||||||
|
a.config.DiscoveryMaxStale = time.Duration(7 * time.Second)
|
||||||
|
ensureConsistency(t, a, "/v1/catalog/nodes", a.config.DiscoveryMaxStale, false)
|
||||||
|
// Not in KV
|
||||||
|
ensureConsistency(t, a, "/v1/kv/my/path", 0, false)
|
||||||
|
|
||||||
|
// DiscoveryConsistencyLevel should apply
|
||||||
|
ensureConsistency(t, a, "/v1/health/service/one", a.config.DiscoveryMaxStale, false)
|
||||||
|
ensureConsistency(t, a, "/v1/catalog/service/one", a.config.DiscoveryMaxStale, false)
|
||||||
|
ensureConsistency(t, a, "/v1/catalog/services", a.config.DiscoveryMaxStale, false)
|
||||||
|
|
||||||
|
// Query path should be taken into account
|
||||||
|
ensureConsistency(t, a, "/v1/catalog/services?consistent", 0, true)
|
||||||
|
// Since stale is added, no MaxStale should be applied
|
||||||
|
ensureConsistency(t, a, "/v1/catalog/services?stale", -1, false)
|
||||||
|
ensureConsistency(t, a, "/v1/catalog/services?leader", 0, false)
|
||||||
|
}
|
||||||
|
|
||||||
func TestParseConsistency_Invalid(t *testing.T) {
|
func TestParseConsistency_Invalid(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
resp := httptest.NewRecorder()
|
resp := httptest.NewRecorder()
|
||||||
var b structs.QueryOptions
|
var b structs.QueryOptions
|
||||||
|
|
||||||
req, _ := http.NewRequest("GET", "/v1/catalog/nodes?stale&consistent", nil)
|
req, _ := http.NewRequest("GET", "/v1/catalog/nodes?stale&consistent", nil)
|
||||||
if d := parseConsistency(resp, req, &b); !d {
|
a := NewTestAgent(t.Name(), "")
|
||||||
|
defer a.Shutdown()
|
||||||
|
if d := a.srv.parseConsistency(resp, req, &b); !d {
|
||||||
t.Fatalf("expected done")
|
t.Fatalf("expected done")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -43,9 +43,17 @@ func (s *HTTPServer) preparedQueryList(resp http.ResponseWriter, req *http.Reque
|
|||||||
}
|
}
|
||||||
|
|
||||||
var reply structs.IndexedPreparedQueries
|
var reply structs.IndexedPreparedQueries
|
||||||
|
defer setMeta(resp, &reply.QueryMeta)
|
||||||
|
RETRY_ONCE:
|
||||||
if err := s.agent.RPC("PreparedQuery.List", &args, &reply); err != nil {
|
if err := s.agent.RPC("PreparedQuery.List", &args, &reply); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact {
|
||||||
|
args.AllowStale = false
|
||||||
|
args.MaxStaleDuration = 0
|
||||||
|
goto RETRY_ONCE
|
||||||
|
}
|
||||||
|
reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
|
||||||
|
|
||||||
// Use empty list instead of nil.
|
// Use empty list instead of nil.
|
||||||
if reply.Queries == nil {
|
if reply.Queries == nil {
|
||||||
@ -100,6 +108,8 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r
|
|||||||
}
|
}
|
||||||
|
|
||||||
var reply structs.PreparedQueryExecuteResponse
|
var reply structs.PreparedQueryExecuteResponse
|
||||||
|
defer setMeta(resp, &reply.QueryMeta)
|
||||||
|
RETRY_ONCE:
|
||||||
if err := s.agent.RPC("PreparedQuery.Execute", &args, &reply); err != nil {
|
if err := s.agent.RPC("PreparedQuery.Execute", &args, &reply); err != nil {
|
||||||
// We have to check the string since the RPC sheds
|
// We have to check the string since the RPC sheds
|
||||||
// the specific error type.
|
// the specific error type.
|
||||||
@ -110,6 +120,12 @@ func (s *HTTPServer) preparedQueryExecute(id string, resp http.ResponseWriter, r
|
|||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact {
|
||||||
|
args.AllowStale = false
|
||||||
|
args.MaxStaleDuration = 0
|
||||||
|
goto RETRY_ONCE
|
||||||
|
}
|
||||||
|
reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
|
||||||
|
|
||||||
// Note that we translate using the DC that the results came from, since
|
// Note that we translate using the DC that the results came from, since
|
||||||
// a query can fail over to a different DC than where the execute request
|
// a query can fail over to a different DC than where the execute request
|
||||||
@ -145,6 +161,8 @@ func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, r
|
|||||||
}
|
}
|
||||||
|
|
||||||
var reply structs.PreparedQueryExplainResponse
|
var reply structs.PreparedQueryExplainResponse
|
||||||
|
defer setMeta(resp, &reply.QueryMeta)
|
||||||
|
RETRY_ONCE:
|
||||||
if err := s.agent.RPC("PreparedQuery.Explain", &args, &reply); err != nil {
|
if err := s.agent.RPC("PreparedQuery.Explain", &args, &reply); err != nil {
|
||||||
// We have to check the string since the RPC sheds
|
// We have to check the string since the RPC sheds
|
||||||
// the specific error type.
|
// the specific error type.
|
||||||
@ -155,6 +173,12 @@ func (s *HTTPServer) preparedQueryExplain(id string, resp http.ResponseWriter, r
|
|||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact {
|
||||||
|
args.AllowStale = false
|
||||||
|
args.MaxStaleDuration = 0
|
||||||
|
goto RETRY_ONCE
|
||||||
|
}
|
||||||
|
reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
|
||||||
return reply, nil
|
return reply, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -168,6 +192,8 @@ func (s *HTTPServer) preparedQueryGet(id string, resp http.ResponseWriter, req *
|
|||||||
}
|
}
|
||||||
|
|
||||||
var reply structs.IndexedPreparedQueries
|
var reply structs.IndexedPreparedQueries
|
||||||
|
defer setMeta(resp, &reply.QueryMeta)
|
||||||
|
RETRY_ONCE:
|
||||||
if err := s.agent.RPC("PreparedQuery.Get", &args, &reply); err != nil {
|
if err := s.agent.RPC("PreparedQuery.Get", &args, &reply); err != nil {
|
||||||
// We have to check the string since the RPC sheds
|
// We have to check the string since the RPC sheds
|
||||||
// the specific error type.
|
// the specific error type.
|
||||||
@ -178,6 +204,12 @@ func (s *HTTPServer) preparedQueryGet(id string, resp http.ResponseWriter, req *
|
|||||||
}
|
}
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < reply.LastContact {
|
||||||
|
args.AllowStale = false
|
||||||
|
args.MaxStaleDuration = 0
|
||||||
|
goto RETRY_ONCE
|
||||||
|
}
|
||||||
|
reply.ConsistencyLevel = args.QueryOptions.ConsistencyLevel()
|
||||||
return reply.Queries, nil
|
return reply.Queries, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,6 +110,11 @@ type QueryOptions struct {
|
|||||||
// If set, the leader must verify leadership prior to
|
// If set, the leader must verify leadership prior to
|
||||||
// servicing the request. Prevents a stale read.
|
// servicing the request. Prevents a stale read.
|
||||||
RequireConsistent bool
|
RequireConsistent bool
|
||||||
|
|
||||||
|
// If set and AllowStale is true, will try first a stale
|
||||||
|
// read, and then will perform a consistent read if stale
|
||||||
|
// read is older than value
|
||||||
|
MaxStaleDuration time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsRead is always true for QueryOption.
|
// IsRead is always true for QueryOption.
|
||||||
@ -117,6 +122,17 @@ func (q QueryOptions) IsRead() bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConsistencyLevel display the consistency required by a request
|
||||||
|
func (q QueryOptions) ConsistencyLevel() string {
|
||||||
|
if q.RequireConsistent {
|
||||||
|
return "consistent"
|
||||||
|
} else if q.AllowStale {
|
||||||
|
return "stale"
|
||||||
|
} else {
|
||||||
|
return "leader"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (q QueryOptions) AllowStaleRead() bool {
|
func (q QueryOptions) AllowStaleRead() bool {
|
||||||
return q.AllowStale
|
return q.AllowStale
|
||||||
}
|
}
|
||||||
@ -157,6 +173,11 @@ type QueryMeta struct {
|
|||||||
|
|
||||||
// Used to indicate if there is a known leader node
|
// Used to indicate if there is a known leader node
|
||||||
KnownLeader bool
|
KnownLeader bool
|
||||||
|
|
||||||
|
// Consistencylevel returns the consistency used to serve the query
|
||||||
|
// Having `discovery_max_stale` on the agent can affect whether
|
||||||
|
// the request was served by a leader.
|
||||||
|
ConsistencyLevel string
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterRequest is used for the Catalog.Register endpoint
|
// RegisterRequest is used for the Catalog.Register endpoint
|
||||||
|
@ -917,6 +917,17 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
|
|||||||
leader, so this lets Consul continue serving requests in long outage scenarios where no leader can
|
leader, so this lets Consul continue serving requests in long outage scenarios where no leader can
|
||||||
be elected.
|
be elected.
|
||||||
|
|
||||||
|
* <a name="discovery_max_stale"></a><a href="discovery_max_stale">`discovery_max_stale`</a> - Enables
|
||||||
|
stale requests for all service discovery HTTP endpoints. This is equivalent to the
|
||||||
|
[`max_stale`](#max_stale) configuration for DNS requests. If this value is zero (default), all service
|
||||||
|
discovery HTTP endpoints are forwarded to the leader. If this value is greater than zero, any Consul server
|
||||||
|
can handle the service discovery request. If a Consul server is behind the leader by more than `discovery_max_stale`,
|
||||||
|
the query will be re-evaluated on the leader to get more up-to-date results. Consul agents also add a new
|
||||||
|
`X-Consul-Effective-Consistency` response header which indicates if the agent did a stale read. `discover-max-stale`
|
||||||
|
was introduced in Consul 1.0.7 as a way for Consul operators to force stale requests from clients at the agent level,
|
||||||
|
and defaults to zero which matches default consistency behavior in earlier Consul versions.
|
||||||
|
|
||||||
|
|
||||||
* <a name="node_ttl"></a><a href="#node_ttl">`node_ttl`</a> - By default, this is "0s", so all
|
* <a name="node_ttl"></a><a href="#node_ttl">`node_ttl`</a> - By default, this is "0s", so all
|
||||||
node lookups are served with a 0 TTL value. DNS caching for node lookups can be enabled by
|
node lookups are served with a 0 TTL value. DNS caching for node lookups can be enabled by
|
||||||
setting this value. This should be specified with the "s" suffix for second or "m" for minute.
|
setting this value. This should be specified with the "s" suffix for second or "m" for minute.
|
||||||
|
Loading…
x
Reference in New Issue
Block a user