Implementation of Weights Data structures (#4468)

* Implementation of Weights Data structures

Adding this datastructure will allow us to resolve the
issues #1088 and #4198

This new structure defaults to values:
```
   { Passing: 1, Warning: 0 }
```

Which means, use weight of 0 for a Service in Warning State
while use Weight 1 for a Healthy Service.
Thus it remains compatible with previous Consul versions.

* Implemented weights for DNS SRV Records

* DNS properly support agents with weight support while server does not (backwards compatibility)

* Use Warning value of Weights of 1 by default

When using DNS interface with only_passing = false, all nodes
with non-Critical healthcheck used to have a weight value of 1.
While having weight.Warning = 0 as default value, this is probably
a bad idea as it breaks ascending compatibility.

Thus, we put a default value of 1 to be consistent with existing behaviour.

* Added documentation for new weight field in service description

* Better documentation about weights as suggested by @banks

* Return weight = 1 for unknown Check states as suggested by @banks

* Fixed typo (of -> or) in error message as requested by @mkeeler

* Fixed unstable unit test TestRetryJoin

* Fixed unstable tests

* Fixed wrong Fatalf format in `testrpc/wait.go`

* Added notes regarding DNS SRV lookup limitations regarding number of instances

* Documentation fixes and clarification regarding SRV records with weights as requested by @banks

* Rephrase docs
This commit is contained in:
Pierre Souchay 2018-09-07 16:30:47 +02:00 committed by Paul Banks
parent da931445d3
commit eddcf228ea
20 changed files with 387 additions and 28 deletions

View File

@ -172,6 +172,13 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request)
// Use empty list instead of nil
for id, s := range services {
weights := api.AgentWeights{Passing: 1, Warning: 1}
if s.Weights != nil {
if s.Weights.Passing > 0 {
weights.Passing = s.Weights.Passing
}
weights.Warning = s.Weights.Warning
}
as := &api.AgentService{
Kind: api.ServiceKind(s.Kind),
ID: s.ID,
@ -184,6 +191,7 @@ func (s *HTTPServer) AgentServices(resp http.ResponseWriter, req *http.Request)
CreateIndex: s.CreateIndex,
ModifyIndex: s.ModifyIndex,
ProxyDestination: s.ProxyDestination,
Weights: weights,
}
if as.Tags == nil {
as.Tags = []string{}
@ -581,6 +589,13 @@ func (s *HTTPServer) AgentRegisterService(resp http.ResponseWriter, req *http.Re
// Get the node service.
ns := args.NodeService()
if ns.Weights != nil {
if err := structs.ValidateWeights(ns.Weights); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, fmt.Errorf("Invalid Weights: %v", err))
return nil, nil
}
}
if err := structs.ValidateMetadata(ns.Meta, false); err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprint(resp, fmt.Errorf("Invalid Service Meta: %v", err))

View File

@ -1329,6 +1329,10 @@ func TestAgent_RegisterService(t *testing.T) {
TTL: 30 * time.Second,
},
},
Weights: &structs.Weights{
Passing: 100,
Warning: 3,
},
}
req, _ := http.NewRequest("PUT", "/v1/agent/service/register?token=abc123", jsonReader(args))
@ -1347,6 +1351,12 @@ func TestAgent_RegisterService(t *testing.T) {
if val := a.State.Service("test").Meta["hello"]; val != "world" {
t.Fatalf("Missing meta: %v", a.State.Service("test").Meta)
}
if val := a.State.Service("test").Weights.Passing; val != 100 {
t.Fatalf("Expected 100 for Weights.Passing, got: %v", val)
}
if val := a.State.Service("test").Weights.Warning; val != 3 {
t.Fatalf("Expected 3 for Weights.Warning, got: %v", val)
}
// Ensure we have a check mapping
checks := a.State.Checks()
@ -1370,7 +1380,7 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) {
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
json := `{"name":"test", "port":8000, "enable_tag_override": true, "meta": {"some": "meta"}}`
json := `{"name":"test", "port":8000, "enable_tag_override": true, "meta": {"some": "meta"}, "weights":{"passing": 16}}`
req, _ := http.NewRequest("PUT", "/v1/agent/service/register", strings.NewReader(json))
obj, err := a.srv.AgentRegisterService(nil, req)
@ -1386,6 +1396,7 @@ func TestAgent_RegisterService_TranslateKeys(t *testing.T) {
Meta: map[string]string{"some": "meta"},
Port: 8000,
EnableTagOverride: true,
Weights: &structs.Weights{Passing: 16, Warning: 0},
}
if got, want := a.State.Service("test"), svc; !verify.Values(t, "", got, want) {

View File

@ -641,6 +641,10 @@ func verifyIndexChurn(t *testing.T, tags []string) {
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
weights := &structs.Weights{
Passing: 1,
Warning: 1,
}
// Ensure we have a leader before we start adding the services
testrpc.WaitForLeader(t, a.RPC, "dc1")
@ -649,6 +653,7 @@ func verifyIndexChurn(t *testing.T, tags []string) {
Service: "redis",
Port: 8000,
Tags: tags,
Weights: weights,
}
if err := a.AddService(svc, nil, true, ""); err != nil {
t.Fatalf("err: %v", err)

View File

@ -1086,6 +1086,19 @@ func (b *Builder) serviceVal(v *ServiceDefinition) *structs.ServiceDefinition {
} else {
meta = v.Meta
}
serviceWeights := &structs.Weights{Passing: 1, Warning: 1}
if v.Weights != nil {
if v.Weights.Passing != nil {
serviceWeights.Passing = *v.Weights.Passing
}
if v.Weights.Warning != nil {
serviceWeights.Warning = *v.Weights.Warning
}
}
if err := structs.ValidateWeights(serviceWeights); err != nil {
b.err = multierror.Append(fmt.Errorf("Invalid weight definition for service %s: %s", b.stringVal(v.Name), err))
}
return &structs.ServiceDefinition{
Kind: b.serviceKindVal(v.Kind),
ID: b.stringVal(v.ID),
@ -1096,6 +1109,7 @@ func (b *Builder) serviceVal(v *ServiceDefinition) *structs.ServiceDefinition {
Port: b.intVal(v.Port),
Token: b.stringVal(v.Token),
EnableTagOverride: b.boolVal(v.EnableTagOverride),
Weights: serviceWeights,
Checks: checks,
ProxyDestination: b.stringVal(v.ProxyDestination),
Connect: b.serviceConnectVal(v.Connect),

View File

@ -322,6 +322,12 @@ type Autopilot struct {
UpgradeVersionTag *string `json:"upgrade_version_tag,omitempty" hcl:"upgrade_version_tag" mapstructure:"upgrade_version_tag"`
}
// ServiceWeights defines the registration of weights used in DNS for a Service
type ServiceWeights struct {
Passing *int `json:"passing,omitempty" hcl:"passing" mapstructure:"passing"`
Warning *int `json:"warning,omitempty" hcl:"warning" mapstructure:"warning"`
}
type ServiceDefinition struct {
Kind *string `json:"kind,omitempty" hcl:"kind" mapstructure:"kind"`
ID *string `json:"id,omitempty" hcl:"id" mapstructure:"id"`
@ -333,6 +339,7 @@ type ServiceDefinition struct {
Check *CheckDefinition `json:"check,omitempty" hcl:"check" mapstructure:"check"`
Checks []CheckDefinition `json:"checks,omitempty" hcl:"checks" mapstructure:"checks"`
Token *string `json:"token,omitempty" hcl:"token" mapstructure:"token"`
Weights *ServiceWeights `json:"weights,omitempty" hcl:"weights" mapstructure:"weights"`
EnableTagOverride *bool `json:"enable_tag_override,omitempty" hcl:"enable_tag_override" mapstructure:"enable_tag_override"`
ProxyDestination *string `json:"proxy_destination,omitempty" hcl:"proxy_destination" mapstructure:"proxy_destination"`
Connect *ServiceConnect `json:"connect,omitempty" hcl:"connect" mapstructure:"connect"`

View File

@ -2005,16 +2005,22 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
},
json: []string{
`{ "service": { "name": "a", "port": 80 } }`,
`{ "service": { "name": "b", "port": 90, "meta": {"my": "value"} } }`,
`{ "service": { "name": "b", "port": 90, "meta": {"my": "value"}, "weights": {"passing": 13} } }`,
},
hcl: []string{
`service = { name = "a" port = 80 }`,
`service = { name = "b" port = 90 meta={my="value"}}`,
`service = { name = "b" port = 90 meta={my="value"}, weights={passing=13}}`,
},
patch: func(rt *RuntimeConfig) {
rt.Services = []*structs.ServiceDefinition{
&structs.ServiceDefinition{Name: "a", Port: 80},
&structs.ServiceDefinition{Name: "b", Port: 90, Meta: map[string]string{"my": "value"}},
&structs.ServiceDefinition{Name: "a", Port: 80, Weights: &structs.Weights{
Passing: 1,
Warning: 1,
}},
&structs.ServiceDefinition{Name: "b", Port: 90, Meta: map[string]string{"my": "value"}, Weights: &structs.Weights{
Passing: 13,
Warning: 1,
}},
}
rt.DataDir = dataDir
},
@ -2108,6 +2114,10 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
ScriptArgs: []string{"a", "b"},
},
},
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
},
}
rt.DataDir = dataDir
@ -2167,6 +2177,10 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
},
},
},
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
},
}
},
@ -2211,6 +2225,10 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
},
},
},
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
},
}
},
@ -2266,10 +2284,18 @@ func TestConfigFlagsAndEdgecases(t *testing.T) {
},
},
},
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
},
&structs.ServiceDefinition{
Name: "service-A2",
Port: 81,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
},
}
},
@ -2747,6 +2773,10 @@ func TestFullConfig(t *testing.T) {
"address": "cOlSOhbp",
"token": "msy7iWER",
"port": 24237,
"weights": {
"passing": 100,
"warning": 1
},
"enable_tag_override": true,
"check": {
"id": "RMi85Dv8",
@ -2855,6 +2885,10 @@ func TestFullConfig(t *testing.T) {
"address": "R6H6g8h0",
"token": "ZgY8gjMI",
"port": 38292,
"weights": {
"passing": 1979,
"warning": 6
},
"enable_tag_override": true,
"checks": [
{
@ -3238,6 +3272,10 @@ func TestFullConfig(t *testing.T) {
address = "cOlSOhbp"
token = "msy7iWER"
port = 24237
weights = {
passing = 100,
warning = 1
}
enable_tag_override = true
check = {
id = "RMi85Dv8"
@ -3346,6 +3384,10 @@ func TestFullConfig(t *testing.T) {
address = "R6H6g8h0"
token = "ZgY8gjMI"
port = 38292
weights = {
passing = 1979,
warning = 6
}
enable_tag_override = true
checks = [
{
@ -3798,12 +3840,16 @@ func TestFullConfig(t *testing.T) {
ServerPort: 3757,
Services: []*structs.ServiceDefinition{
{
ID: "wI1dzxS4",
Name: "7IszXMQ1",
Tags: []string{"0Zwg8l6v", "zebELdN5"},
Address: "9RhqPSPB",
Token: "myjKJkWH",
Port: 72219,
ID: "wI1dzxS4",
Name: "7IszXMQ1",
Tags: []string{"0Zwg8l6v", "zebELdN5"},
Address: "9RhqPSPB",
Token: "myjKJkWH",
Port: 72219,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
EnableTagOverride: true,
Checks: []*structs.CheckType{
&structs.CheckType{
@ -3830,12 +3876,16 @@ func TestFullConfig(t *testing.T) {
},
},
{
ID: "MRHVMZuD",
Name: "6L6BVfgH",
Tags: []string{"7Ale4y6o", "PMBW08hy"},
Address: "R6H6g8h0",
Token: "ZgY8gjMI",
Port: 38292,
ID: "MRHVMZuD",
Name: "6L6BVfgH",
Tags: []string{"7Ale4y6o", "PMBW08hy"},
Address: "R6H6g8h0",
Token: "ZgY8gjMI",
Port: 38292,
Weights: &structs.Weights{
Passing: 1979,
Warning: 6,
},
EnableTagOverride: true,
Checks: structs.CheckTypes{
&structs.CheckType{
@ -3897,15 +3947,23 @@ func TestFullConfig(t *testing.T) {
Port: 31471,
Kind: "connect-proxy",
ProxyDestination: "6L6BVfgH",
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
},
{
ID: "dLOXpSCI",
Name: "o1ynPkp0",
Tags: []string{"nkwshvM5", "NTDWn3ek"},
Address: "cOlSOhbp",
Token: "msy7iWER",
Meta: map[string]string{"mymeta": "data"},
Port: 24237,
ID: "dLOXpSCI",
Name: "o1ynPkp0",
Tags: []string{"nkwshvM5", "NTDWn3ek"},
Address: "cOlSOhbp",
Token: "msy7iWER",
Meta: map[string]string{"mymeta": "data"},
Port: 24237,
Weights: &structs.Weights{
Passing: 100,
Warning: 1,
},
EnableTagOverride: true,
Connect: &structs.ServiceConnect{
Native: true,
@ -4324,6 +4382,10 @@ func TestSanitize(t *testing.T) {
Check: structs.CheckType{
Name: "blurb",
},
Weights: &structs.Weights{
Passing: 67,
Warning: 3,
},
},
},
Checks: []*structs.CheckDefinition{
@ -4552,7 +4614,11 @@ func TestSanitize(t *testing.T) {
"Port": 0,
"ProxyDestination": "",
"Tags": [],
"Token": "hidden"
"Token": "hidden",
"Weights": {
"Passing": 67,
"Warning": 3
}
}],
"SessionTTLMin": "0s",
"SkipLeaveOnInt": false,

View File

@ -189,6 +189,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
Address: "1.1.1.1",
Port: 8080,
Tags: []string{"master"},
Weights: &structs.Weights{Passing: 1, Warning: 1},
}
if err := s.EnsureRegistration(2, req); err != nil {
t.Fatalf("err: %s", err)
@ -203,6 +204,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
Address: "1.1.1.1",
Port: 8080,
Tags: []string{"master"},
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{CreateIndex: 2, ModifyIndex: 2},
},
}
@ -393,6 +395,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
Service: "redis",
Address: "1.1.1.1",
Port: 8080,
Weights: &structs.Weights{Passing: 1, Warning: 1},
}
restore = s.Restore()
if err := restore.Registration(2, req); err != nil {
@ -1299,6 +1302,7 @@ func TestStateStore_EnsureService(t *testing.T) {
Tags: []string{"prod"},
Address: "1.1.1.1",
Port: 1111,
Weights: &structs.Weights{Passing: 1, Warning: 0},
}
// Creating a service without a node returns an error.
@ -1430,6 +1434,10 @@ func TestStateStore_EnsureService_connectProxy(t *testing.T) {
Address: "1.1.1.1",
Port: 1111,
ProxyDestination: "foo",
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
// Service successfully registers into the state store.
@ -2065,6 +2073,7 @@ func TestStateStore_Service_Snapshot(t *testing.T) {
Tags: []string{"prod"},
Address: "1.1.1.1",
Port: 1111,
Weights: &structs.Weights{Passing: 1, Warning: 0},
},
&structs.NodeService{
ID: "service2",
@ -2072,6 +2081,7 @@ func TestStateStore_Service_Snapshot(t *testing.T) {
Tags: []string{"dev"},
Address: "1.1.1.2",
Port: 1112,
Weights: &structs.Weights{Passing: 1, Warning: 1},
},
}
for i, svc := range ns {
@ -3252,6 +3262,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
Service: "service1",
Address: "1.1.1.1",
Port: 1111,
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
@ -3262,6 +3273,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
Service: "service2",
Address: "1.1.1.1",
Port: 1111,
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 3,
@ -3301,6 +3313,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
Service: "service1",
Address: "1.1.1.1",
Port: 1111,
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 4,
ModifyIndex: 4,
@ -3311,6 +3324,7 @@ func TestStateStore_NodeInfo_NodeDump(t *testing.T) {
Service: "service2",
Address: "1.1.1.1",
Port: 1111,
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 5,
ModifyIndex: 5,

View File

@ -10,6 +10,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/serf/coordinate"
)
@ -72,6 +73,7 @@ func TestCoordinate_Nodes(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Make sure an empty list is non-nil.
req, _ := http.NewRequest("GET", "/v1/coordinate/nodes?dc=dc1", nil)
@ -182,6 +184,7 @@ func TestCoordinate_Node(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Make sure we get a 404 with no coordinates.
req, _ := http.NewRequest("GET", "/v1/coordinate/node/foo?dc=dc1", nil)

View File

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/miekg/dns"
)
@ -1205,6 +1206,51 @@ func (d *DNSServer) serviceNodeRecords(dc string, nodes structs.CheckServiceNode
}
}
func findWeight(node structs.CheckServiceNode) int {
// By default, when only_passing is false, warning and passing nodes are returned
// Those values will be used if using a client with support while server has no
// support for weights
weightPassing := 1
weightWarning := 1
if node.Service.Weights != nil {
weightPassing = node.Service.Weights.Passing
weightWarning = node.Service.Weights.Warning
}
serviceChecks := make(api.HealthChecks, 0)
for _, c := range node.Checks {
if c.ServiceName == node.Service.Service || c.ServiceName == "" {
healthCheck := &api.HealthCheck{
Node: c.Node,
CheckID: string(c.CheckID),
Name: c.Name,
Status: c.Status,
Notes: c.Notes,
Output: c.Output,
ServiceID: c.ServiceID,
ServiceName: c.ServiceName,
ServiceTags: c.ServiceTags,
}
serviceChecks = append(serviceChecks, healthCheck)
}
}
status := serviceChecks.AggregatedStatus()
switch status {
case api.HealthWarning:
return weightWarning
case api.HealthPassing:
return weightPassing
case api.HealthMaint:
// Not used in theory
return 0
case api.HealthCritical:
// Should not happen since already filtered
return 0
default:
// When non-standard status, return 1
return 1
}
}
// serviceARecords is used to add the SRV records for a service lookup
func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes, req, resp *dns.Msg, ttl time.Duration) {
handled := make(map[string]struct{})
@ -1219,6 +1265,7 @@ func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes
}
handled[tuple] = struct{}{}
weight := findWeight(node)
// Add the SRV record
srvRec := &dns.SRV{
Hdr: dns.RR_Header{
@ -1228,7 +1275,7 @@ func (d *DNSServer) serviceSRVRecords(dc string, nodes structs.CheckServiceNodes
Ttl: uint32(ttl / time.Second),
},
Priority: 1,
Weight: 1,
Weight: uint16(weight),
Port: uint16(node.Service.Port),
Target: fmt.Sprintf("%s.node.%s.%s", node.Node.Node, dc, d.domain),
}

View File

@ -218,6 +218,7 @@ func TestHealthServiceChecks(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
req, _ := http.NewRequest("GET", "/v1/health/checks/consul?dc=dc1", nil)
resp := httptest.NewRecorder()
@ -322,6 +323,7 @@ func TestHealthServiceChecks_DistanceSort(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
// Create a service check
args := &structs.RegisterRequest{

View File

@ -47,6 +47,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Service: "mysql",
Tags: []string{"master"},
Port: 5000,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
a.State.AddService(srv1, "")
args.Service = srv1
@ -60,6 +64,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Service: "redis",
Tags: []string{},
Port: 8000,
Weights: &structs.Weights{
Passing: 1,
Warning: 0,
},
}
a.State.AddService(srv2, "")
@ -77,6 +85,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Service: "web",
Tags: []string{},
Port: 80,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
a.State.AddService(srv3, "")
@ -86,6 +98,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Service: "lb",
Tags: []string{},
Port: 443,
Weights: &structs.Weights{
Passing: 1,
Warning: 0,
},
}
args.Service = srv4
if err := a.RPC("Catalog.Register", args, &out); err != nil {
@ -99,6 +115,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Tags: []string{},
Address: "127.0.0.10",
Port: 8000,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
a.State.AddService(srv5, "")
@ -116,6 +136,10 @@ func TestAgentAntiEntropy_Services(t *testing.T) {
Service: "cache",
Tags: []string{},
Port: 11211,
Weights: &structs.Weights{
Passing: 1,
Warning: 0,
},
}
a.State.SetServiceState(&local.ServiceState{
Service: srv6,
@ -257,6 +281,10 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
Service: "mysql-proxy",
Port: 5000,
ProxyDestination: "db",
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
a.State.AddService(srv1, "")
args.Service = srv1
@ -269,6 +297,10 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
Port: 8000,
Kind: structs.ServiceKindConnectProxy,
ProxyDestination: "redis",
Weights: &structs.Weights{
Passing: 1,
Warning: 0,
},
}
a.State.AddService(srv2, "")
@ -285,6 +317,10 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
Port: 80,
Kind: structs.ServiceKindConnectProxy,
ProxyDestination: "web",
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
a.State.AddService(srv3, "")
@ -295,6 +331,10 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
Port: 443,
Kind: structs.ServiceKindConnectProxy,
ProxyDestination: "lb",
Weights: &structs.Weights{
Passing: 1,
Warning: 0,
},
}
args.Service = srv4
assert.Nil(a.RPC("Catalog.Register", args, &out))
@ -306,6 +346,10 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
Port: 11211,
Kind: structs.ServiceKindConnectProxy,
ProxyDestination: "cache-proxy",
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
a.State.SetServiceState(&local.ServiceState{
Service: srv5,
@ -394,6 +438,10 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
Tags: []string{"tag1"},
Port: 6100,
EnableTagOverride: true,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
a.State.AddService(srv1, "")
@ -404,6 +452,10 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
Tags: []string{"tag2"},
Port: 6200,
EnableTagOverride: false,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
a.State.AddService(srv2, "")
@ -421,6 +473,10 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
Tags: []string{"tag1_mod"},
Port: 7100,
EnableTagOverride: true,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
@ -432,6 +488,10 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
Tags: []string{"tag2_mod"},
Port: 7200,
EnableTagOverride: false,
Weights: &structs.Weights{
Passing: 1,
Warning: 0,
},
}
if err := a.RPC("Catalog.Register", args, &out); err != nil {
t.Fatalf("err: %v", err)
@ -465,6 +525,10 @@ func TestAgentAntiEntropy_EnableTagOverride(t *testing.T) {
Tags: []string{"tag1_mod"},
Port: 6100,
EnableTagOverride: true,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
if !verify.Values(t, "", got, want) {
t.FailNow()
@ -651,6 +715,10 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
Service: "mysql",
Tags: []string{"master"},
Port: 5000,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
a.State.AddService(srv1, token)
@ -660,6 +728,10 @@ func TestAgentAntiEntropy_Services_ACLDeny(t *testing.T) {
Service: "api",
Tags: []string{"foo"},
Port: 5001,
Weights: &structs.Weights{
Passing: 1,
Warning: 0,
},
}
a.State.AddService(srv2, token)
@ -990,6 +1062,10 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
Service: "mysql",
Tags: []string{"master"},
Port: 5000,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
a.State.AddService(srv1, "root")
srv2 := &structs.NodeService{
@ -997,6 +1073,10 @@ func TestAgentAntiEntropy_Checks_ACLDeny(t *testing.T) {
Service: "api",
Tags: []string{"foo"},
Port: 5001,
Weights: &structs.Weights{
Passing: 1,
Warning: 1,
},
}
a.State.AddService(srv2, "root")

View File

@ -496,7 +496,6 @@ func TestSessionGet(t *testing.T) {
}
func TestSessionList(t *testing.T) {
t.Parallel()
t.Run("", func(t *testing.T) {
a := NewTestAgent(t.Name(), "")
defer a.Shutdown()

View File

@ -23,6 +23,7 @@ type ServiceDefinition struct {
Port int
Check CheckType
Checks CheckTypes
Weights *Weights
Token string
EnableTagOverride bool
ProxyDestination string
@ -38,6 +39,7 @@ func (s *ServiceDefinition) NodeService() *NodeService {
Address: s.Address,
Meta: s.Meta,
Port: s.Port,
Weights: s.Weights,
EnableTagOverride: s.EnableTagOverride,
ProxyDestination: s.ProxyDestination,
}

View File

@ -379,6 +379,23 @@ func ValidateMetadata(meta map[string]string, allowConsulPrefix bool) error {
return nil
}
// ValidateWeights checks the definition of DNS weight is valid
func ValidateWeights(weights *Weights) error {
if weights == nil {
return nil
}
if weights.Passing < 1 {
return fmt.Errorf("Passing must be greater than 0")
}
if weights.Warning < 0 {
return fmt.Errorf("Warning must be greater or equal than 0")
}
if weights.Passing > 65535 || weights.Warning > 65535 {
return fmt.Errorf("DNS Weight must be between 0 and 65535")
}
return nil
}
// validateMetaPair checks that the given key/value pair is in a valid format
func validateMetaPair(key, value string, allowConsulPrefix bool) error {
if key == "" {
@ -430,6 +447,7 @@ type ServiceNode struct {
ServiceName string
ServiceTags []string
ServiceAddress string
ServiceWeights Weights
ServiceMeta map[string]string
ServicePort int
ServiceEnableTagOverride bool
@ -461,6 +479,7 @@ func (s *ServiceNode) PartialClone() *ServiceNode {
ServiceAddress: s.ServiceAddress,
ServicePort: s.ServicePort,
ServiceMeta: nsmeta,
ServiceWeights: s.ServiceWeights,
ServiceEnableTagOverride: s.ServiceEnableTagOverride,
ServiceProxyDestination: s.ServiceProxyDestination,
ServiceConnect: s.ServiceConnect,
@ -481,6 +500,7 @@ func (s *ServiceNode) ToNodeService() *NodeService {
Address: s.ServiceAddress,
Port: s.ServicePort,
Meta: s.ServiceMeta,
Weights: &s.ServiceWeights,
EnableTagOverride: s.ServiceEnableTagOverride,
ProxyDestination: s.ServiceProxyDestination,
Connect: s.ServiceConnect,
@ -491,6 +511,12 @@ func (s *ServiceNode) ToNodeService() *NodeService {
}
}
// Weights represent the weight used by DNS for a given status
type Weights struct {
Passing int
Warning int
}
type ServiceNodes []*ServiceNode
// ServiceKind is the kind of service being registered.
@ -522,6 +548,7 @@ type NodeService struct {
Address string
Meta map[string]string
Port int
Weights *Weights
EnableTagOverride bool
// ProxyDestination is the name of the service that this service is
@ -590,6 +617,7 @@ func (s *NodeService) IsSame(other *NodeService) bool {
!reflect.DeepEqual(s.Tags, other.Tags) ||
s.Address != other.Address ||
s.Port != other.Port ||
!reflect.DeepEqual(s.Weights, other.Weights) ||
!reflect.DeepEqual(s.Meta, other.Meta) ||
s.EnableTagOverride != other.EnableTagOverride ||
s.Kind != other.Kind ||
@ -603,6 +631,15 @@ func (s *NodeService) IsSame(other *NodeService) bool {
// ToServiceNode converts the given node service to a service node.
func (s *NodeService) ToServiceNode(node string) *ServiceNode {
theWeights := Weights{
Passing: 1,
Warning: 1,
}
if s.Weights != nil {
if err := ValidateWeights(s.Weights); err == nil {
theWeights = *s.Weights
}
}
return &ServiceNode{
// Skip ID, see ServiceNode definition.
Node: node,
@ -615,6 +652,7 @@ func (s *NodeService) ToServiceNode(node string) *ServiceNode {
ServiceAddress: s.Address,
ServicePort: s.Port,
ServiceMeta: s.Meta,
ServiceWeights: theWeights,
ServiceEnableTagOverride: s.EnableTagOverride,
ServiceProxyDestination: s.ProxyDestination,
ServiceConnect: s.Connect,

View File

@ -188,6 +188,12 @@ func TestStructs_ServiceNode_PartialClone(t *testing.T) {
if !reflect.DeepEqual(sn, clone) {
t.Fatalf("bad: %v VS %v", clone, sn)
}
oldPassingWeight := clone.ServiceWeights.Passing
sn.ServiceWeights.Passing = 1000
if reflect.DeepEqual(sn, clone) {
t.Fatalf("clone wasn't independent of the original for Meta")
}
sn.ServiceWeights.Passing = oldPassingWeight
sn.ServiceMeta["new_meta"] = "new_value"
if reflect.DeepEqual(sn, clone) {
t.Fatalf("clone wasn't independent of the original for Meta")
@ -206,8 +212,9 @@ func TestStructs_ServiceNode_Conversions(t *testing.T) {
sn.Datacenter = ""
sn.TaggedAddresses = nil
sn.NodeMeta = nil
sn.ServiceWeights = Weights{Passing: 1, Warning: 1}
if !reflect.DeepEqual(sn, sn2) {
t.Fatalf("bad: %v", sn2)
t.Fatalf("bad: %#v, but expected %#v", sn2, sn)
}
}

View File

@ -51,6 +51,12 @@ type AgentCheck struct {
Definition HealthCheckDefinition
}
// AgentWeights represent optional weights for a service
type AgentWeights struct {
Passing int
Warning int
}
// AgentService represents a service known to the agent
type AgentService struct {
Kind ServiceKind
@ -60,6 +66,7 @@ type AgentService struct {
Meta map[string]string
Port int
Address string
Weights AgentWeights
EnableTagOverride bool
CreateIndex uint64
ModifyIndex uint64
@ -119,6 +126,7 @@ type AgentServiceRegistration struct {
Address string `json:",omitempty"`
EnableTagOverride bool `json:",omitempty"`
Meta map[string]string `json:",omitempty"`
Weights *AgentWeights `json:",omitempty"`
Check *AgentServiceCheck
Checks AgentServiceChecks
ProxyDestination string `json:",omitempty"`

View File

@ -1,5 +1,10 @@
package api
type Weights struct {
Passing int
Warning int
}
type Node struct {
ID string
Node string
@ -24,6 +29,7 @@ type CatalogService struct {
ServiceTags []string
ServiceMeta map[string]string
ServicePort int
ServiceWeights Weights
ServiceEnableTagOverride bool
CreateIndex uint64
ModifyIndex uint64

View File

@ -9,6 +9,8 @@ import (
"sync"
"testing"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/consul/testutil/retry"
@ -83,6 +85,7 @@ func TestRetryJoin(t *testing.T) {
t.Parallel()
a := agent.NewTestAgent(t.Name(), "")
defer a.Shutdown()
testrpc.WaitForLeader(t, a.RPC, "dc1")
shutdownCh := make(chan struct{})

View File

@ -95,6 +95,21 @@ node's metadata key starts with `rfc1035-`.
A service lookup is used to query for service providers. Service queries support
two lookup methods: standard and strict [RFC 2782](https://tools.ietf.org/html/rfc2782).
By default, SRV weights are all set at 1, but changing weights is supported using the
`Weights` attribute of the [service definition](/docs/agent/services.html).
Note that DNS is limited in size per request, even when performing DNS TCP
queries.
For services having many instances (more than 500), it might not be possible to
retrieve the complete list of instances for the service.
When DNS SRV response are sent, order is randomized, but weights are not
taken into account. In the case of truncation different clients using weighted SRV
responses will have partial and inconsistent views of instances weights so the
request distribution could be skewed from the intended weights. In that case,
it is recommended to use the HTTP API to retrieve the list of nodes.
### Standard Lookup
The format of a standard service lookup is:

View File

@ -52,6 +52,10 @@ example shows all possible fields, but note that only a few are required.
"command": [],
"config": {}
}
},
"weights": {
"passing": 5,
"warning": 1
}
}
}
@ -151,6 +155,19 @@ are used to configure the proxy and are specified in the [proxy
docs](/docs/connect/proxies.html). If `native` is true, it is an error to also
specifiy a managed proxy instance.
The `weights` field is an optional field to specify the weight of a service in
DNS SRV responses. If this field is not specified, its default value is:
`"weights": {"passing": 1, "warning": 1}`.
When a service is `critical`, it is excluded from DNS responses.
Services with warning checks are in included in responses by default,
but excluded if the optional param `only_passing = true` is present in
agent DNS configuration or `?passing` is used via the API.
When DNS SRV requests are made, the response will include the weights
specified given the state of the service.
This allows some instances to be given higher weight if they have more capacity,
and optionally allows reducing load on services with checks in `warning` status
by giving passing instances a higher weight.
## Multiple Service Definitions
Multiple services definitions can be provided at once using the plural