Add LB policy to service-resolver

This commit is contained in:
freddygv 2020-08-22 18:05:09 -06:00
parent 9e1c6727f9
commit ff56a64b08
4 changed files with 563 additions and 0 deletions

View File

@ -639,6 +639,10 @@ type ServiceResolverConfigEntry struct {
// to this service.
ConnectTimeout time.Duration `json:",omitempty" alias:"connect_timeout"`
// LoadBalancer determines the load balancing policy and configuration for services
// issuing requests to this upstream service.
LoadBalancer LoadBalancer `json:",omitempty" alias:"load_balancer"`
EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
RaftIndex
}
@ -807,6 +811,56 @@ func (e *ServiceResolverConfigEntry) Validate() error {
return fmt.Errorf("Bad ConnectTimeout '%s', must be >= 0", e.ConnectTimeout)
}
validPolicies := map[string]bool{
"": true,
"random": true,
"round_robin": true,
"least_request": true,
"ring_hash": true,
"maglev": true,
}
if ok := validPolicies[e.LoadBalancer.Policy]; !ok {
return fmt.Errorf("Bad LoadBalancer policy: %q is not supported", e.LoadBalancer.Policy)
}
if e.LoadBalancer.Policy != "ring_hash" && e.LoadBalancer.RingHashConfig != (RingHashConfig{}) {
return fmt.Errorf("Bad LoadBalancer configuration. "+
"RingHashConfig specified for incompatible load balancing policy %q", e.LoadBalancer.Policy)
}
if e.LoadBalancer.Policy != "least_request" && e.LoadBalancer.LeastRequestConfig != (LeastRequestConfig{}) {
return fmt.Errorf("Bad LoadBalancer configuration. "+
"LeastRequestConfig specified for incompatible load balancing policy %q", e.LoadBalancer.Policy)
}
if !e.LoadBalancer.IsHashBased() && len(e.LoadBalancer.HashPolicies) > 0 {
return fmt.Errorf("Bad LoadBalancer configuration: "+
"HashPolicies specified for non-hash-based Policy: %q", e.LoadBalancer.Policy)
}
validFields := map[string]bool{
"header": true,
"cookie": true,
"query_parameter": true,
}
for i, hp := range e.LoadBalancer.HashPolicies {
if ok := validFields[hp.Field]; hp.Field != "" && !ok {
return fmt.Errorf("Bad LoadBalancer HashPolicy[%d]: %q is not a supported field", i, hp.Field)
}
if hp.SourceAddress && hp.Field != "" {
return fmt.Errorf("Bad LoadBalancer HashPolicy[%d]: "+
"A single hash policy cannot hash both a source address and a %q", i, hp.Field)
}
if hp.SourceAddress && hp.FieldMatchValue != "" {
return fmt.Errorf("Bad LoadBalancer HashPolicy[%d]: "+
"A FieldMatchValue cannot be specified when hashing SourceAddress", i)
}
if hp.Field != "" && hp.FieldMatchValue == "" {
return fmt.Errorf("Bad LoadBalancer HashPolicy[%d]: Field %q was specified without a FieldMatchValue", i, hp.Field)
}
if hp.FieldMatchValue != "" && hp.Field == "" {
return fmt.Errorf("Bad LoadBalancer HashPolicy[%d]: FieldMatchValue requires a Field to apply to", i)
}
}
return nil
}
@ -943,6 +997,76 @@ type ServiceResolverFailover struct {
Datacenters []string `json:",omitempty"`
}
// LoadBalancer determines the load balancing policy and configuration for services
// issuing requests to this upstream service.
type LoadBalancer struct {
// Policy is the load balancing policy used to select a host
Policy string `json:",omitempty"`
// RingHashConfig contains configuration for the "ring_hash" policy type
RingHashConfig RingHashConfig `json:",omitempty" alias:"ring_hash_config"`
// LeastRequestConfig contains configuration for the "least_request" policy type
LeastRequestConfig LeastRequestConfig `json:",omitempty" alias:"least_request_config"`
// HashPolicies is a list of hash policies to use for hashing load balancing algorithms.
// Hash policies are evaluated individually and combined such that identical lists
// result in the same hash.
// If no hash policies are present, or none are successfully evaluated,
// then a random backend host will be selected.
HashPolicies []HashPolicy `json:",omitempty" alias:"hash_policies"`
}
func (l LoadBalancer) IsHashBased() bool {
switch l.Policy {
case "maglev", "ring_hash":
return true
default:
return false
}
}
// RingHashConfig contains configuration for the "ring_hash" policy type
type RingHashConfig struct {
// MinimumRingSize determines the minimum number of hashes per destination host
MinimumRingSize uint64 `json:",omitempty" alias:"minimum_ring_size"`
// MaximumRingSize determines the maximum number of hashes per destination host
MaximumRingSize uint64 `json:",omitempty" alias:"maximum_ring_size"`
}
// LeastRequestConfig contains configuration for the "least_request" policy type
type LeastRequestConfig struct {
// ChoiceCount determines the number of random healthy hosts from which to select the one with the least requests.
ChoiceCount uint32 `json:",omitempty" alias:"choice_count"`
}
// HashPolicy is a list of hash policies to use for hashing load balancing algorithms.
// Hash policies are evaluated individually and combined such that identical lists
// result in the same hash.
// If no hash policies are present, or none are successfully evaluated,
// then a random backend host will be selected.
type HashPolicy struct {
// Field is the attribute type to hash on.
// Must be one of "header","cookie", or "query_parameter".
// Cannot be specified along with SourceIP.
Field string `json:",omitempty"`
// FieldMatchValue is the value to hash.
// ie. header name, cookie name, URL query parameter name
// Cannot be specified along with SourceIP.
FieldMatchValue string `json:",omitempty" alias:"field_value"`
// SourceAddress determines whether the hash should be of the source IP rather than of a field and field value.
// Cannot be specified along with Field and FieldMatchValue.
SourceAddress bool `json:",omitempty" alias:"source_address"`
// Terminal will short circuit the computation of the hash when multiple hash policies are present.
// If a hash is computed when a Terminal policy is evaluated,
// then that hash will be used and subsequent hash policies will be ignored.
Terminal bool `json:",omitempty"`
}
type discoveryChainConfigEntry interface {
ConfigEntry
// ListRelatedServices returns a list of other names of services referenced

View File

@ -536,6 +536,278 @@ func TestServiceResolverConfigEntry(t *testing.T) {
}
}
func TestServiceResolverConfigEntry_LoadBalancer(t *testing.T) {
type testcase struct {
name string
entry *ServiceResolverConfigEntry
normalizeErr string
validateErr string
// check is called between normalize and validate
check func(t *testing.T, entry *ServiceResolverConfigEntry)
}
cases := []testcase{
{
name: "empty policy is valid",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{Policy: ""},
},
},
{
name: "supported policy",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{Policy: "random"},
},
},
{
name: "unsupported policy",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{Policy: "fake-policy"},
},
validateErr: `"fake-policy" is not supported`,
},
{
name: "bad policy for least request config",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{
Policy: "ring_hash",
LeastRequestConfig: LeastRequestConfig{ChoiceCount: 2},
},
},
validateErr: `LeastRequestConfig specified for incompatible load balancing policy`,
},
{
name: "bad policy for ring hash config",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{
Policy: "least_request",
RingHashConfig: RingHashConfig{MinimumRingSize: 1024},
},
},
validateErr: `RingHashConfig specified for incompatible load balancing policy`,
},
{
name: "good policy for ring hash config",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{
Policy: "ring_hash",
RingHashConfig: RingHashConfig{MinimumRingSize: 1024},
},
},
},
{
name: "good policy for least request config",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{
Policy: "least_request",
LeastRequestConfig: LeastRequestConfig{ChoiceCount: 2},
},
},
},
{
name: "empty policy is not defaulted",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{
Policy: "",
},
},
check: func(t *testing.T, entry *ServiceResolverConfigEntry) {
require.Equal(t, "", entry.LoadBalancer.Policy)
},
},
{
name: "empty policy with hash policy",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{
Policy: "",
HashPolicies: []HashPolicy{
{
SourceAddress: true,
},
},
},
},
validateErr: `HashPolicies specified for non-hash-based Policy`,
},
{
name: "supported match field",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{
Policy: "maglev",
HashPolicies: []HashPolicy{
{
Field: "header",
FieldMatchValue: "X-Consul-Token",
},
},
},
},
},
{
name: "unsupported match field",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{
Policy: "maglev",
HashPolicies: []HashPolicy{
{
Field: "not-header",
},
},
},
},
validateErr: `"not-header" is not a supported field`,
},
{
name: "cannot match on source address and custom field",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{
Policy: "maglev",
HashPolicies: []HashPolicy{
{
Field: "header",
SourceAddress: true,
},
},
},
},
validateErr: `A single hash policy cannot hash both a source address and a "header"`,
},
{
name: "matchvalue not compatible with source address",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{
Policy: "maglev",
HashPolicies: []HashPolicy{
{
FieldMatchValue: "X-Consul-Token",
SourceAddress: true,
},
},
},
},
validateErr: `A FieldMatchValue cannot be specified when hashing SourceAddress`,
},
{
name: "field without match value",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{
Policy: "maglev",
HashPolicies: []HashPolicy{
{
Field: "header",
},
},
},
},
validateErr: `Field "header" was specified without a FieldMatchValue`,
},
{
name: "field without match value",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{
Policy: "maglev",
HashPolicies: []HashPolicy{
{
FieldMatchValue: "my-cookie",
},
},
},
},
validateErr: `FieldMatchValue requires a Field to apply to`,
},
{
name: "ring hash kitchen sink",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{
Policy: "ring_hash",
RingHashConfig: RingHashConfig{MaximumRingSize: 10, MinimumRingSize: 2},
HashPolicies: []HashPolicy{
{
Field: "cookie",
FieldMatchValue: "my-cookie",
},
{
Field: "header",
FieldMatchValue: "alt-header",
Terminal: true,
},
},
},
},
},
{
name: "least request kitchen sink",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test",
LoadBalancer: LoadBalancer{
Policy: "least_request",
LeastRequestConfig: LeastRequestConfig{ChoiceCount: 20},
},
},
},
}
for _, tc := range cases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
err := tc.entry.Normalize()
if tc.normalizeErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.normalizeErr)
return
}
require.NoError(t, err)
if tc.check != nil {
tc.check(t, tc.entry)
}
err = tc.entry.Validate()
if tc.validateErr != "" {
require.Error(t, err)
require.Contains(t, err.Error(), tc.validateErr)
return
}
require.NoError(t, err)
})
}
}
func TestServiceSplitterConfigEntry(t *testing.T) {
makesplitter := func(splits ...ServiceSplit) *ServiceSplitterConfigEntry {

View File

@ -138,6 +138,10 @@ type ServiceResolverConfigEntry struct {
Failover map[string]ServiceResolverFailover `json:",omitempty"`
ConnectTimeout time.Duration `json:",omitempty" alias:"connect_timeout"`
// LoadBalancer determines the load balancing policy and configuration for services
// issuing requests to this upstream service.
LoadBalancer LoadBalancer `json:",omitempty" alias:"load_balancer"`
CreateIndex uint64
ModifyIndex uint64
}
@ -201,3 +205,64 @@ type ServiceResolverFailover struct {
Namespace string `json:",omitempty"`
Datacenters []string `json:",omitempty"`
}
// LoadBalancer determines the load balancing policy and configuration for services
// issuing requests to this upstream service.
type LoadBalancer struct {
// Policy is the load balancing policy used to select a host
Policy string `json:",omitempty"`
// RingHashConfig contains configuration for the "ring_hash" policy type
RingHashConfig RingHashConfig `json:",omitempty" alias:"ring_hash_config"`
// LeastRequestConfig contains configuration for the "least_request" policy type
LeastRequestConfig LeastRequestConfig `json:",omitempty" alias:"least_request_config"`
// HashPolicies is a list of hash policies to use for hashing load balancing algorithms.
// Hash policies are evaluated individually and combined such that identical lists
// result in the same hash.
// If no hash policies are present, or none are successfully evaluated,
// then a random backend host will be selected.
HashPolicies []HashPolicy `json:",omitempty" alias:"hash_policies"`
}
// RingHashConfig contains configuration for the "ring_hash" policy type
type RingHashConfig struct {
// MinimumRingSize determines the minimum number of hashes per destination host
MinimumRingSize uint64 `json:",omitempty" alias:"minimum_ring_size"`
// MaximumRingSize determines the maximum number of hashes per destination host
MaximumRingSize uint64 `json:",omitempty" alias:"maximum_ring_size"`
}
// LeastRequestConfig contains configuration for the "least_request" policy type
type LeastRequestConfig struct {
// ChoiceCount determines the number of random healthy hosts from which to select the one with the least requests.
ChoiceCount uint32 `json:",omitempty" alias:"choice_count"`
}
// HashPolicy is a list of hash policies to use for hashing load balancing algorithms.
// Hash policies are evaluated individually and combined such that identical lists
// result in the same hash.
// If no hash policies are present, or none are successfully evaluated,
// then a random backend host will be selected.
type HashPolicy struct {
// Field is the attribute type to hash on.
// Must be one of "header","cookie", or "query_parameter".
// Cannot be specified along with SourceIP.
Field string `json:",omitempty"`
// FieldMatchValue is the value to hash.
// ie. header name, cookie name, URL query parameter name
// Cannot be specified along with SourceIP.
FieldMatchValue string `json:",omitempty" alias:"field_value"`
// SourceAddress determines whether the hash should be of the source IP rather than of a field and field value.
// Cannot be specified along with Field and FieldMatchValue.
SourceAddress bool `json:",omitempty" alias:"source_address"`
// Terminal will short circuit the computation of the hash when multiple hash policies are present.
// If a hash is computed when a Terminal policy is evaluated,
// then that hash will be used and subsequent hash policies will be ignored.
Terminal bool `json:",omitempty"`
}

View File

@ -233,3 +233,105 @@ func TestAPI_ConfigEntry_DiscoveryChain(t *testing.T) {
require.True(t, ok, "subtest %q failed so aborting remainder", name)
}
}
func TestAPI_ConfigEntry_ServiceResolver_LoadBalancer(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
config_entries := c.ConfigEntries()
verifyResolver := func(t *testing.T, initial ConfigEntry) {
t.Helper()
require.IsType(t, &ServiceResolverConfigEntry{}, initial)
testEntry := initial.(*ServiceResolverConfigEntry)
// set it
_, wm, err := config_entries.Set(testEntry, nil)
require.NoError(t, err)
require.NotNil(t, wm)
require.NotEqual(t, 0, wm.RequestTime)
// get it
entry, qm, err := config_entries.Get(ServiceResolver, testEntry.Name, nil)
require.NoError(t, err)
require.NotNil(t, qm)
require.NotEqual(t, 0, qm.RequestTime)
// verify it
readResolver, ok := entry.(*ServiceResolverConfigEntry)
require.True(t, ok)
readResolver.ModifyIndex = 0 // reset for Equals()
readResolver.CreateIndex = 0 // reset for Equals()
require.Equal(t, testEntry, readResolver)
}
// First set the necessary protocols to allow advanced routing features.
for _, service := range []string{
"test-least-req",
"test-ring-hash",
} {
serviceDefaults := &ServiceConfigEntry{
Kind: ServiceDefaults,
Name: service,
Protocol: "http",
}
_, _, err := config_entries.Set(serviceDefaults, nil)
require.NoError(t, err)
}
// NOTE: Due to service graph validation, these have to happen in a specific order.
for _, tc := range []struct {
name string
entry ConfigEntry
verify func(t *testing.T, initial ConfigEntry)
}{
{
name: "least-req",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test-least-req",
Namespace: defaultNamespace,
LoadBalancer: LoadBalancer{
Policy: "least_request",
LeastRequestConfig: LeastRequestConfig{ChoiceCount: 10},
},
},
verify: verifyResolver,
},
{
name: "ring-hash-with-policies",
entry: &ServiceResolverConfigEntry{
Kind: ServiceResolver,
Name: "test-ring-hash",
Namespace: defaultNamespace,
LoadBalancer: LoadBalancer{
Policy: "ring_hash",
RingHashConfig: RingHashConfig{
MinimumRingSize: 1024 * 2,
MaximumRingSize: 1024 * 4,
},
HashPolicies: []HashPolicy{
{
Field: "header",
FieldMatchValue: "my-session-header",
Terminal: true,
},
{
SourceAddress: true,
},
},
},
},
verify: verifyResolver,
},
} {
tc := tc
name := fmt.Sprintf("%s:%s: %s", tc.entry.GetKind(), tc.entry.GetName(), tc.name)
ok := t.Run(name, func(t *testing.T) {
tc.verify(t, tc.entry)
})
require.True(t, ok, "subtest %q failed so aborting remainder", name)
}
}