From ff56a64b0823e9593f84e84d656a87868c68206c Mon Sep 17 00:00:00 2001 From: freddygv Date: Sat, 22 Aug 2020 18:05:09 -0600 Subject: [PATCH] Add LB policy to service-resolver --- agent/structs/config_entry_discoverychain.go | 124 ++++++++ .../config_entry_discoverychain_test.go | 272 ++++++++++++++++++ api/config_entry_discoverychain.go | 65 +++++ api/config_entry_discoverychain_test.go | 102 +++++++ 4 files changed, 563 insertions(+) diff --git a/agent/structs/config_entry_discoverychain.go b/agent/structs/config_entry_discoverychain.go index 04cf32353a..c87b46f00f 100644 --- a/agent/structs/config_entry_discoverychain.go +++ b/agent/structs/config_entry_discoverychain.go @@ -639,6 +639,10 @@ type ServiceResolverConfigEntry struct { // to this service. ConnectTimeout time.Duration `json:",omitempty" alias:"connect_timeout"` + // LoadBalancer determines the load balancing policy and configuration for services + // issuing requests to this upstream service. + LoadBalancer LoadBalancer `json:",omitempty" alias:"load_balancer"` + EnterpriseMeta `hcl:",squash" mapstructure:",squash"` RaftIndex } @@ -807,6 +811,56 @@ func (e *ServiceResolverConfigEntry) Validate() error { return fmt.Errorf("Bad ConnectTimeout '%s', must be >= 0", e.ConnectTimeout) } + validPolicies := map[string]bool{ + "": true, + "random": true, + "round_robin": true, + "least_request": true, + "ring_hash": true, + "maglev": true, + } + if ok := validPolicies[e.LoadBalancer.Policy]; !ok { + return fmt.Errorf("Bad LoadBalancer policy: %q is not supported", e.LoadBalancer.Policy) + } + + if e.LoadBalancer.Policy != "ring_hash" && e.LoadBalancer.RingHashConfig != (RingHashConfig{}) { + return fmt.Errorf("Bad LoadBalancer configuration. "+ + "RingHashConfig specified for incompatible load balancing policy %q", e.LoadBalancer.Policy) + } + if e.LoadBalancer.Policy != "least_request" && e.LoadBalancer.LeastRequestConfig != (LeastRequestConfig{}) { + return fmt.Errorf("Bad LoadBalancer configuration. "+ + "LeastRequestConfig specified for incompatible load balancing policy %q", e.LoadBalancer.Policy) + } + if !e.LoadBalancer.IsHashBased() && len(e.LoadBalancer.HashPolicies) > 0 { + return fmt.Errorf("Bad LoadBalancer configuration: "+ + "HashPolicies specified for non-hash-based Policy: %q", e.LoadBalancer.Policy) + } + + validFields := map[string]bool{ + "header": true, + "cookie": true, + "query_parameter": true, + } + for i, hp := range e.LoadBalancer.HashPolicies { + if ok := validFields[hp.Field]; hp.Field != "" && !ok { + return fmt.Errorf("Bad LoadBalancer HashPolicy[%d]: %q is not a supported field", i, hp.Field) + } + if hp.SourceAddress && hp.Field != "" { + return fmt.Errorf("Bad LoadBalancer HashPolicy[%d]: "+ + "A single hash policy cannot hash both a source address and a %q", i, hp.Field) + } + if hp.SourceAddress && hp.FieldMatchValue != "" { + return fmt.Errorf("Bad LoadBalancer HashPolicy[%d]: "+ + "A FieldMatchValue cannot be specified when hashing SourceAddress", i) + } + if hp.Field != "" && hp.FieldMatchValue == "" { + return fmt.Errorf("Bad LoadBalancer HashPolicy[%d]: Field %q was specified without a FieldMatchValue", i, hp.Field) + } + if hp.FieldMatchValue != "" && hp.Field == "" { + return fmt.Errorf("Bad LoadBalancer HashPolicy[%d]: FieldMatchValue requires a Field to apply to", i) + } + } + return nil } @@ -943,6 +997,76 @@ type ServiceResolverFailover struct { Datacenters []string `json:",omitempty"` } +// LoadBalancer determines the load balancing policy and configuration for services +// issuing requests to this upstream service. +type LoadBalancer struct { + // Policy is the load balancing policy used to select a host + Policy string `json:",omitempty"` + + // RingHashConfig contains configuration for the "ring_hash" policy type + RingHashConfig RingHashConfig `json:",omitempty" alias:"ring_hash_config"` + + // LeastRequestConfig contains configuration for the "least_request" policy type + LeastRequestConfig LeastRequestConfig `json:",omitempty" alias:"least_request_config"` + + // HashPolicies is a list of hash policies to use for hashing load balancing algorithms. + // Hash policies are evaluated individually and combined such that identical lists + // result in the same hash. + // If no hash policies are present, or none are successfully evaluated, + // then a random backend host will be selected. + HashPolicies []HashPolicy `json:",omitempty" alias:"hash_policies"` +} + +func (l LoadBalancer) IsHashBased() bool { + switch l.Policy { + case "maglev", "ring_hash": + return true + default: + return false + } +} + +// RingHashConfig contains configuration for the "ring_hash" policy type +type RingHashConfig struct { + // MinimumRingSize determines the minimum number of hashes per destination host + MinimumRingSize uint64 `json:",omitempty" alias:"minimum_ring_size"` + + // MaximumRingSize determines the maximum number of hashes per destination host + MaximumRingSize uint64 `json:",omitempty" alias:"maximum_ring_size"` +} + +// LeastRequestConfig contains configuration for the "least_request" policy type +type LeastRequestConfig struct { + // ChoiceCount determines the number of random healthy hosts from which to select the one with the least requests. + ChoiceCount uint32 `json:",omitempty" alias:"choice_count"` +} + +// HashPolicy is a list of hash policies to use for hashing load balancing algorithms. +// Hash policies are evaluated individually and combined such that identical lists +// result in the same hash. +// If no hash policies are present, or none are successfully evaluated, +// then a random backend host will be selected. +type HashPolicy struct { + // Field is the attribute type to hash on. + // Must be one of "header","cookie", or "query_parameter". + // Cannot be specified along with SourceIP. + Field string `json:",omitempty"` + + // FieldMatchValue is the value to hash. + // ie. header name, cookie name, URL query parameter name + // Cannot be specified along with SourceIP. + FieldMatchValue string `json:",omitempty" alias:"field_value"` + + // SourceAddress determines whether the hash should be of the source IP rather than of a field and field value. + // Cannot be specified along with Field and FieldMatchValue. + SourceAddress bool `json:",omitempty" alias:"source_address"` + + // Terminal will short circuit the computation of the hash when multiple hash policies are present. + // If a hash is computed when a Terminal policy is evaluated, + // then that hash will be used and subsequent hash policies will be ignored. + Terminal bool `json:",omitempty"` +} + type discoveryChainConfigEntry interface { ConfigEntry // ListRelatedServices returns a list of other names of services referenced diff --git a/agent/structs/config_entry_discoverychain_test.go b/agent/structs/config_entry_discoverychain_test.go index 5f332e9bbe..d5db08ec52 100644 --- a/agent/structs/config_entry_discoverychain_test.go +++ b/agent/structs/config_entry_discoverychain_test.go @@ -536,6 +536,278 @@ func TestServiceResolverConfigEntry(t *testing.T) { } } +func TestServiceResolverConfigEntry_LoadBalancer(t *testing.T) { + + type testcase struct { + name string + entry *ServiceResolverConfigEntry + normalizeErr string + validateErr string + + // check is called between normalize and validate + check func(t *testing.T, entry *ServiceResolverConfigEntry) + } + + cases := []testcase{ + { + name: "empty policy is valid", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{Policy: ""}, + }, + }, + { + name: "supported policy", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{Policy: "random"}, + }, + }, + { + name: "unsupported policy", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{Policy: "fake-policy"}, + }, + validateErr: `"fake-policy" is not supported`, + }, + { + name: "bad policy for least request config", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{ + Policy: "ring_hash", + LeastRequestConfig: LeastRequestConfig{ChoiceCount: 2}, + }, + }, + validateErr: `LeastRequestConfig specified for incompatible load balancing policy`, + }, + { + name: "bad policy for ring hash config", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{ + Policy: "least_request", + RingHashConfig: RingHashConfig{MinimumRingSize: 1024}, + }, + }, + validateErr: `RingHashConfig specified for incompatible load balancing policy`, + }, + { + name: "good policy for ring hash config", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{ + Policy: "ring_hash", + RingHashConfig: RingHashConfig{MinimumRingSize: 1024}, + }, + }, + }, + { + name: "good policy for least request config", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{ + Policy: "least_request", + LeastRequestConfig: LeastRequestConfig{ChoiceCount: 2}, + }, + }, + }, + { + name: "empty policy is not defaulted", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{ + Policy: "", + }, + }, + check: func(t *testing.T, entry *ServiceResolverConfigEntry) { + require.Equal(t, "", entry.LoadBalancer.Policy) + }, + }, + { + name: "empty policy with hash policy", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{ + Policy: "", + HashPolicies: []HashPolicy{ + { + SourceAddress: true, + }, + }, + }, + }, + validateErr: `HashPolicies specified for non-hash-based Policy`, + }, + { + name: "supported match field", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{ + Policy: "maglev", + HashPolicies: []HashPolicy{ + { + Field: "header", + FieldMatchValue: "X-Consul-Token", + }, + }, + }, + }, + }, + { + name: "unsupported match field", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{ + Policy: "maglev", + HashPolicies: []HashPolicy{ + { + Field: "not-header", + }, + }, + }, + }, + validateErr: `"not-header" is not a supported field`, + }, + { + name: "cannot match on source address and custom field", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{ + Policy: "maglev", + HashPolicies: []HashPolicy{ + { + Field: "header", + SourceAddress: true, + }, + }, + }, + }, + validateErr: `A single hash policy cannot hash both a source address and a "header"`, + }, + { + name: "matchvalue not compatible with source address", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{ + Policy: "maglev", + HashPolicies: []HashPolicy{ + { + FieldMatchValue: "X-Consul-Token", + SourceAddress: true, + }, + }, + }, + }, + validateErr: `A FieldMatchValue cannot be specified when hashing SourceAddress`, + }, + { + name: "field without match value", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{ + Policy: "maglev", + HashPolicies: []HashPolicy{ + { + Field: "header", + }, + }, + }, + }, + validateErr: `Field "header" was specified without a FieldMatchValue`, + }, + { + name: "field without match value", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{ + Policy: "maglev", + HashPolicies: []HashPolicy{ + { + FieldMatchValue: "my-cookie", + }, + }, + }, + }, + validateErr: `FieldMatchValue requires a Field to apply to`, + }, + { + name: "ring hash kitchen sink", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{ + Policy: "ring_hash", + RingHashConfig: RingHashConfig{MaximumRingSize: 10, MinimumRingSize: 2}, + HashPolicies: []HashPolicy{ + { + Field: "cookie", + FieldMatchValue: "my-cookie", + }, + { + Field: "header", + FieldMatchValue: "alt-header", + Terminal: true, + }, + }, + }, + }, + }, + { + name: "least request kitchen sink", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test", + LoadBalancer: LoadBalancer{ + Policy: "least_request", + LeastRequestConfig: LeastRequestConfig{ChoiceCount: 20}, + }, + }, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + err := tc.entry.Normalize() + if tc.normalizeErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.normalizeErr) + return + } + require.NoError(t, err) + + if tc.check != nil { + tc.check(t, tc.entry) + } + + err = tc.entry.Validate() + if tc.validateErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.validateErr) + return + } + require.NoError(t, err) + }) + } +} + func TestServiceSplitterConfigEntry(t *testing.T) { makesplitter := func(splits ...ServiceSplit) *ServiceSplitterConfigEntry { diff --git a/api/config_entry_discoverychain.go b/api/config_entry_discoverychain.go index f3994f0dd9..0bfd3dde2a 100644 --- a/api/config_entry_discoverychain.go +++ b/api/config_entry_discoverychain.go @@ -138,6 +138,10 @@ type ServiceResolverConfigEntry struct { Failover map[string]ServiceResolverFailover `json:",omitempty"` ConnectTimeout time.Duration `json:",omitempty" alias:"connect_timeout"` + // LoadBalancer determines the load balancing policy and configuration for services + // issuing requests to this upstream service. + LoadBalancer LoadBalancer `json:",omitempty" alias:"load_balancer"` + CreateIndex uint64 ModifyIndex uint64 } @@ -201,3 +205,64 @@ type ServiceResolverFailover struct { Namespace string `json:",omitempty"` Datacenters []string `json:",omitempty"` } + +// LoadBalancer determines the load balancing policy and configuration for services +// issuing requests to this upstream service. +type LoadBalancer struct { + // Policy is the load balancing policy used to select a host + Policy string `json:",omitempty"` + + // RingHashConfig contains configuration for the "ring_hash" policy type + RingHashConfig RingHashConfig `json:",omitempty" alias:"ring_hash_config"` + + // LeastRequestConfig contains configuration for the "least_request" policy type + LeastRequestConfig LeastRequestConfig `json:",omitempty" alias:"least_request_config"` + + // HashPolicies is a list of hash policies to use for hashing load balancing algorithms. + // Hash policies are evaluated individually and combined such that identical lists + // result in the same hash. + // If no hash policies are present, or none are successfully evaluated, + // then a random backend host will be selected. + HashPolicies []HashPolicy `json:",omitempty" alias:"hash_policies"` +} + +// RingHashConfig contains configuration for the "ring_hash" policy type +type RingHashConfig struct { + // MinimumRingSize determines the minimum number of hashes per destination host + MinimumRingSize uint64 `json:",omitempty" alias:"minimum_ring_size"` + + // MaximumRingSize determines the maximum number of hashes per destination host + MaximumRingSize uint64 `json:",omitempty" alias:"maximum_ring_size"` +} + +// LeastRequestConfig contains configuration for the "least_request" policy type +type LeastRequestConfig struct { + // ChoiceCount determines the number of random healthy hosts from which to select the one with the least requests. + ChoiceCount uint32 `json:",omitempty" alias:"choice_count"` +} + +// HashPolicy is a list of hash policies to use for hashing load balancing algorithms. +// Hash policies are evaluated individually and combined such that identical lists +// result in the same hash. +// If no hash policies are present, or none are successfully evaluated, +// then a random backend host will be selected. +type HashPolicy struct { + // Field is the attribute type to hash on. + // Must be one of "header","cookie", or "query_parameter". + // Cannot be specified along with SourceIP. + Field string `json:",omitempty"` + + // FieldMatchValue is the value to hash. + // ie. header name, cookie name, URL query parameter name + // Cannot be specified along with SourceIP. + FieldMatchValue string `json:",omitempty" alias:"field_value"` + + // SourceAddress determines whether the hash should be of the source IP rather than of a field and field value. + // Cannot be specified along with Field and FieldMatchValue. + SourceAddress bool `json:",omitempty" alias:"source_address"` + + // Terminal will short circuit the computation of the hash when multiple hash policies are present. + // If a hash is computed when a Terminal policy is evaluated, + // then that hash will be used and subsequent hash policies will be ignored. + Terminal bool `json:",omitempty"` +} diff --git a/api/config_entry_discoverychain_test.go b/api/config_entry_discoverychain_test.go index d01f7b6286..0e77429bb6 100644 --- a/api/config_entry_discoverychain_test.go +++ b/api/config_entry_discoverychain_test.go @@ -233,3 +233,105 @@ func TestAPI_ConfigEntry_DiscoveryChain(t *testing.T) { require.True(t, ok, "subtest %q failed so aborting remainder", name) } } + +func TestAPI_ConfigEntry_ServiceResolver_LoadBalancer(t *testing.T) { + t.Parallel() + c, s := makeClient(t) + defer s.Stop() + + config_entries := c.ConfigEntries() + + verifyResolver := func(t *testing.T, initial ConfigEntry) { + t.Helper() + require.IsType(t, &ServiceResolverConfigEntry{}, initial) + testEntry := initial.(*ServiceResolverConfigEntry) + + // set it + _, wm, err := config_entries.Set(testEntry, nil) + require.NoError(t, err) + require.NotNil(t, wm) + require.NotEqual(t, 0, wm.RequestTime) + + // get it + entry, qm, err := config_entries.Get(ServiceResolver, testEntry.Name, nil) + require.NoError(t, err) + require.NotNil(t, qm) + require.NotEqual(t, 0, qm.RequestTime) + + // verify it + readResolver, ok := entry.(*ServiceResolverConfigEntry) + require.True(t, ok) + readResolver.ModifyIndex = 0 // reset for Equals() + readResolver.CreateIndex = 0 // reset for Equals() + + require.Equal(t, testEntry, readResolver) + } + + // First set the necessary protocols to allow advanced routing features. + for _, service := range []string{ + "test-least-req", + "test-ring-hash", + } { + serviceDefaults := &ServiceConfigEntry{ + Kind: ServiceDefaults, + Name: service, + Protocol: "http", + } + _, _, err := config_entries.Set(serviceDefaults, nil) + require.NoError(t, err) + } + + // NOTE: Due to service graph validation, these have to happen in a specific order. + for _, tc := range []struct { + name string + entry ConfigEntry + verify func(t *testing.T, initial ConfigEntry) + }{ + { + name: "least-req", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test-least-req", + Namespace: defaultNamespace, + LoadBalancer: LoadBalancer{ + Policy: "least_request", + LeastRequestConfig: LeastRequestConfig{ChoiceCount: 10}, + }, + }, + verify: verifyResolver, + }, + { + name: "ring-hash-with-policies", + entry: &ServiceResolverConfigEntry{ + Kind: ServiceResolver, + Name: "test-ring-hash", + Namespace: defaultNamespace, + LoadBalancer: LoadBalancer{ + Policy: "ring_hash", + RingHashConfig: RingHashConfig{ + MinimumRingSize: 1024 * 2, + MaximumRingSize: 1024 * 4, + }, + HashPolicies: []HashPolicy{ + { + Field: "header", + FieldMatchValue: "my-session-header", + Terminal: true, + }, + { + SourceAddress: true, + }, + }, + }, + }, + verify: verifyResolver, + }, + } { + tc := tc + name := fmt.Sprintf("%s:%s: %s", tc.entry.GetKind(), tc.entry.GetName(), tc.name) + ok := t.Run(name, func(t *testing.T) { + tc.verify(t, tc.entry) + }) + require.True(t, ok, "subtest %q failed so aborting remainder", name) + } +}