mirror of
https://github.com/status-im/consul.git
synced 2025-01-12 06:44:41 +00:00
Add PrioritizeByLocality to config entries. (#17007)
This commit adds the PrioritizeByLocality field to both proxy-config and service-resolver config entries for locality-aware routing. The field is currently intended for enterprise only, and will be used to enable prioritization of service-mesh connections to services based on geographical region / zone.
This commit is contained in:
parent
04b881a854
commit
87324c9ec8
@ -382,7 +382,6 @@ func (c *compiler) determineIfDefaultChain() bool {
|
||||
}
|
||||
|
||||
target := c.loadedTargets[node.Resolver.Target]
|
||||
|
||||
return target.Service == c.serviceName && target.Namespace == c.evaluateInNamespace && target.Partition == c.evaluateInPartition
|
||||
}
|
||||
|
||||
@ -996,14 +995,21 @@ RESOLVE_AGAIN:
|
||||
Type: structs.DiscoveryGraphNodeTypeResolver,
|
||||
Name: target.ID,
|
||||
Resolver: &structs.DiscoveryResolver{
|
||||
Default: resolver.IsDefault(),
|
||||
Target: target.ID,
|
||||
ConnectTimeout: connectTimeout,
|
||||
RequestTimeout: resolver.RequestTimeout,
|
||||
Default: resolver.IsDefault(),
|
||||
Target: target.ID,
|
||||
ConnectTimeout: connectTimeout,
|
||||
RequestTimeout: resolver.RequestTimeout,
|
||||
PrioritizeByLocality: resolver.PrioritizeByLocality.ToDiscovery(),
|
||||
},
|
||||
LoadBalancer: resolver.LoadBalancer,
|
||||
}
|
||||
|
||||
// Merge default values from the proxy defaults
|
||||
proxyDefault := c.entries.GetProxyDefaults(targetID.PartitionOrDefault())
|
||||
if proxyDefault != nil && node.Resolver.PrioritizeByLocality == nil {
|
||||
node.Resolver.PrioritizeByLocality = proxyDefault.PrioritizeByLocality.ToDiscovery()
|
||||
}
|
||||
|
||||
target.Subset = resolver.Subsets[target.ServiceSubset]
|
||||
|
||||
if serviceDefault := c.entries.GetService(targetID); serviceDefault != nil && serviceDefault.ExternalSNI != "" {
|
||||
@ -1089,7 +1095,7 @@ RESOLVE_AGAIN:
|
||||
// Determine which failover definitions apply.
|
||||
var failoverTargets []*structs.DiscoveryTarget
|
||||
var failoverPolicy *structs.ServiceResolverFailoverPolicy
|
||||
proxyDefault := c.entries.GetProxyDefaults(targetID.PartitionOrDefault())
|
||||
|
||||
if proxyDefault != nil {
|
||||
failoverPolicy = proxyDefault.FailoverPolicy
|
||||
}
|
||||
|
@ -367,16 +367,17 @@ func IsIP(address string) bool {
|
||||
|
||||
// ProxyConfigEntry is the top-level struct for global proxy configuration defaults.
|
||||
type ProxyConfigEntry struct {
|
||||
Kind string
|
||||
Name string
|
||||
Config map[string]interface{}
|
||||
Mode ProxyMode `json:",omitempty"`
|
||||
TransparentProxy TransparentProxyConfig `json:",omitempty" alias:"transparent_proxy"`
|
||||
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
|
||||
Expose ExposeConfig `json:",omitempty"`
|
||||
AccessLogs AccessLogsConfig `json:",omitempty" alias:"access_logs"`
|
||||
EnvoyExtensions EnvoyExtensions `json:",omitempty" alias:"envoy_extensions"`
|
||||
FailoverPolicy *ServiceResolverFailoverPolicy `json:",omitempty" alias:"failover_policy"`
|
||||
Kind string
|
||||
Name string
|
||||
Config map[string]interface{}
|
||||
Mode ProxyMode `json:",omitempty"`
|
||||
TransparentProxy TransparentProxyConfig `json:",omitempty" alias:"transparent_proxy"`
|
||||
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
|
||||
Expose ExposeConfig `json:",omitempty"`
|
||||
AccessLogs AccessLogsConfig `json:",omitempty" alias:"access_logs"`
|
||||
EnvoyExtensions EnvoyExtensions `json:",omitempty" alias:"envoy_extensions"`
|
||||
FailoverPolicy *ServiceResolverFailoverPolicy `json:",omitempty" alias:"failover_policy"`
|
||||
PrioritizeByLocality *ServiceResolverPrioritizeByLocality `json:",omitempty" alias:"prioritize_by_locality"`
|
||||
|
||||
Meta map[string]string `json:",omitempty"`
|
||||
acl.EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
|
||||
@ -443,8 +444,12 @@ func (e *ProxyConfigEntry) Validate() error {
|
||||
return err
|
||||
}
|
||||
|
||||
if !e.FailoverPolicy.isValid() {
|
||||
return fmt.Errorf("Failover policy must be one of '', 'default', or 'order-by-locality'")
|
||||
if err := e.FailoverPolicy.validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := e.PrioritizeByLocality.validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return e.validateEnterpriseMeta()
|
||||
|
@ -857,6 +857,10 @@ type ServiceResolverConfigEntry struct {
|
||||
// specified here.
|
||||
Failover map[string]ServiceResolverFailover `json:",omitempty"`
|
||||
|
||||
// PrioritizeByLocality controls whether the locality of services within the
|
||||
// local partition will be used to prioritize connectivity.
|
||||
PrioritizeByLocality *ServiceResolverPrioritizeByLocality `json:",omitempty" alias:"prioritize_by_locality"`
|
||||
|
||||
// ConnectTimeout is the timeout for establishing new network connections
|
||||
// to this service.
|
||||
ConnectTimeout time.Duration `json:",omitempty" alias:"connect_timeout"`
|
||||
@ -960,7 +964,8 @@ func (e *ServiceResolverConfigEntry) IsDefault() bool {
|
||||
len(e.Failover) == 0 &&
|
||||
e.ConnectTimeout == 0 &&
|
||||
e.RequestTimeout == 0 &&
|
||||
e.LoadBalancer == nil
|
||||
e.LoadBalancer == nil &&
|
||||
e.PrioritizeByLocality == nil
|
||||
}
|
||||
|
||||
func (e *ServiceResolverConfigEntry) GetKind() string {
|
||||
@ -1031,6 +1036,10 @@ func (e *ServiceResolverConfigEntry) Validate() error {
|
||||
return fmt.Errorf("DefaultSubset %q is not a valid subset", e.DefaultSubset)
|
||||
}
|
||||
|
||||
if err := e.PrioritizeByLocality.validate(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if e.Redirect != nil {
|
||||
if !e.InDefaultPartition() && e.Redirect.Datacenter != "" {
|
||||
return fmt.Errorf("Cross-datacenter redirect is only supported in the default partition")
|
||||
@ -1113,8 +1122,8 @@ func (e *ServiceResolverConfigEntry) Validate() error {
|
||||
return fmt.Errorf("Bad Failover[%q]: %s", subset, err)
|
||||
}
|
||||
|
||||
if !f.Policy.isValid() {
|
||||
return fmt.Errorf("Bad Failover[%q]: Policy must be one of '', 'sequential', or 'order-by-locality'", subset)
|
||||
if err := f.Policy.validate(); err != nil {
|
||||
return fmt.Errorf("Bad Failover[%q]: %w", subset, err)
|
||||
}
|
||||
|
||||
if f.ServiceSubset != "" {
|
||||
@ -1445,13 +1454,6 @@ type ServiceResolverFailover struct {
|
||||
SamenessGroup string `json:",omitempty"`
|
||||
}
|
||||
|
||||
type ServiceResolverFailoverPolicy struct {
|
||||
// Mode specifies the type of failover that will be performed. Valid values are
|
||||
// "sequential", "" (equivalent to "sequential") and "order-by-locality".
|
||||
Mode string `json:",omitempty"`
|
||||
Regions []string `json:",omitempty"`
|
||||
}
|
||||
|
||||
func (f *ServiceResolverFailover) ToDiscoveryTargetOpts() DiscoveryTargetOpts {
|
||||
return DiscoveryTargetOpts{
|
||||
Service: f.Service,
|
||||
@ -1469,9 +1471,16 @@ func (f *ServiceResolverFailover) isEmpty() bool {
|
||||
f.SamenessGroup == ""
|
||||
}
|
||||
|
||||
func (fp *ServiceResolverFailoverPolicy) isValid() bool {
|
||||
type ServiceResolverFailoverPolicy struct {
|
||||
// Mode specifies the type of failover that will be performed. Valid values are
|
||||
// "sequential", "" (equivalent to "sequential") and "order-by-locality".
|
||||
Mode string `json:",omitempty"`
|
||||
Regions []string `json:",omitempty"`
|
||||
}
|
||||
|
||||
func (fp *ServiceResolverFailoverPolicy) validate() error {
|
||||
if fp == nil {
|
||||
return true
|
||||
return nil
|
||||
}
|
||||
|
||||
switch fp.Mode {
|
||||
@ -1479,10 +1488,16 @@ func (fp *ServiceResolverFailoverPolicy) isValid() bool {
|
||||
case "sequential":
|
||||
case "order-by-locality":
|
||||
default:
|
||||
return false
|
||||
return fmt.Errorf("Failover-policy mode must be one of '', 'sequential', or 'order-by-locality'")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return true
|
||||
type ServiceResolverPrioritizeByLocality struct {
|
||||
// Mode specifies the type of prioritization that will be performed
|
||||
// when selecting nodes in the local partition.
|
||||
// Valid values are: "" (default "none"), "none", and "failover".
|
||||
Mode string `json:",omitempty"`
|
||||
}
|
||||
|
||||
type ServiceResolverFailoverTarget struct {
|
||||
|
@ -118,3 +118,11 @@ func (f *ServiceResolverFailoverPolicy) ValidateEnterprise() error {
|
||||
func (e *ServiceResolverConfigEntry) RelatedSamenessGroups() []string {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (pbl *ServiceResolverPrioritizeByLocality) validate() error {
|
||||
var zero ServiceResolverPrioritizeByLocality
|
||||
if pbl == nil || *pbl == zero {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("Prioritize-by-locality requires Consul Enterprise ")
|
||||
}
|
||||
|
@ -3174,7 +3174,7 @@ func TestProxyConfigEntry(t *testing.T) {
|
||||
Name: "global",
|
||||
FailoverPolicy: &ServiceResolverFailoverPolicy{Mode: "bad"},
|
||||
},
|
||||
validateErr: `Failover policy must be one of '', 'default', or 'order-by-locality'`,
|
||||
validateErr: `Failover-policy mode must be one of '', 'sequential', or 'order-by-locality'`,
|
||||
},
|
||||
"proxy config with valid failover policy": {
|
||||
entry: &ProxyConfigEntry{
|
||||
|
@ -117,11 +117,12 @@ func (s *DiscoveryGraphNode) MapKey() string {
|
||||
|
||||
// compiled form of ServiceResolverConfigEntry
|
||||
type DiscoveryResolver struct {
|
||||
Default bool `json:",omitempty"`
|
||||
ConnectTimeout time.Duration `json:",omitempty"`
|
||||
RequestTimeout time.Duration `json:",omitempty"`
|
||||
Target string `json:",omitempty"`
|
||||
Failover *DiscoveryFailover `json:",omitempty"`
|
||||
Default bool `json:",omitempty"`
|
||||
ConnectTimeout time.Duration `json:",omitempty"`
|
||||
RequestTimeout time.Duration `json:",omitempty"`
|
||||
Target string `json:",omitempty"`
|
||||
Failover *DiscoveryFailover `json:",omitempty"`
|
||||
PrioritizeByLocality *DiscoveryPrioritizeByLocality `json:",omitempty"`
|
||||
}
|
||||
|
||||
func (r *DiscoveryResolver) MarshalJSON() ([]byte, error) {
|
||||
@ -186,6 +187,20 @@ type DiscoveryFailover struct {
|
||||
Regions []string `json:",omitempty"`
|
||||
}
|
||||
|
||||
// compiled form of ServiceResolverPrioritizeByLocality
|
||||
type DiscoveryPrioritizeByLocality struct {
|
||||
Mode string `json:",omitempty"`
|
||||
}
|
||||
|
||||
func (pbl *ServiceResolverPrioritizeByLocality) ToDiscovery() *DiscoveryPrioritizeByLocality {
|
||||
if pbl == nil {
|
||||
return nil
|
||||
}
|
||||
return &DiscoveryPrioritizeByLocality{
|
||||
Mode: pbl.Mode,
|
||||
}
|
||||
}
|
||||
|
||||
// DiscoveryTarget represents all of the inputs necessary to use a resolver
|
||||
// config entry to execute a catalog query to generate a list of service
|
||||
// instances during discovery.
|
||||
|
@ -232,6 +232,10 @@ func (o *DiscoveryResolver) DeepCopy() *DiscoveryResolver {
|
||||
if o.Failover != nil {
|
||||
cp.Failover = o.Failover.DeepCopy()
|
||||
}
|
||||
if o.PrioritizeByLocality != nil {
|
||||
cp.PrioritizeByLocality = new(DiscoveryPrioritizeByLocality)
|
||||
*cp.PrioritizeByLocality = *o.PrioritizeByLocality
|
||||
}
|
||||
return &cp
|
||||
}
|
||||
|
||||
@ -909,6 +913,10 @@ func (o *ServiceResolverConfigEntry) DeepCopy() *ServiceResolverConfigEntry {
|
||||
cp.Failover[k2] = cp_Failover_v2
|
||||
}
|
||||
}
|
||||
if o.PrioritizeByLocality != nil {
|
||||
cp.PrioritizeByLocality = new(ServiceResolverPrioritizeByLocality)
|
||||
*cp.PrioritizeByLocality = *o.PrioritizeByLocality
|
||||
}
|
||||
if o.LoadBalancer != nil {
|
||||
cp.LoadBalancer = o.LoadBalancer.DeepCopy()
|
||||
}
|
||||
|
@ -314,18 +314,19 @@ func (s *ServiceConfigEntry) GetCreateIndex() uint64 { return s.CreateIndex
|
||||
func (s *ServiceConfigEntry) GetModifyIndex() uint64 { return s.ModifyIndex }
|
||||
|
||||
type ProxyConfigEntry struct {
|
||||
Kind string
|
||||
Name string
|
||||
Partition string `json:",omitempty"`
|
||||
Namespace string `json:",omitempty"`
|
||||
Mode ProxyMode `json:",omitempty"`
|
||||
TransparentProxy *TransparentProxyConfig `json:",omitempty" alias:"transparent_proxy"`
|
||||
Config map[string]interface{} `json:",omitempty"`
|
||||
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
|
||||
Expose ExposeConfig `json:",omitempty"`
|
||||
AccessLogs *AccessLogsConfig `json:",omitempty" alias:"access_logs"`
|
||||
EnvoyExtensions []EnvoyExtension `json:",omitempty" alias:"envoy_extensions"`
|
||||
FailoverPolicy *ServiceResolverFailoverPolicy `json:",omitempty" alias:"failover_policy"`
|
||||
Kind string
|
||||
Name string
|
||||
Partition string `json:",omitempty"`
|
||||
Namespace string `json:",omitempty"`
|
||||
Mode ProxyMode `json:",omitempty"`
|
||||
TransparentProxy *TransparentProxyConfig `json:",omitempty" alias:"transparent_proxy"`
|
||||
Config map[string]interface{} `json:",omitempty"`
|
||||
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"`
|
||||
Expose ExposeConfig `json:",omitempty"`
|
||||
AccessLogs *AccessLogsConfig `json:",omitempty" alias:"access_logs"`
|
||||
EnvoyExtensions []EnvoyExtension `json:",omitempty" alias:"envoy_extensions"`
|
||||
FailoverPolicy *ServiceResolverFailoverPolicy `json:",omitempty" alias:"failover_policy"`
|
||||
PrioritizeByLocality *ServiceResolverPrioritizeByLocality `json:",omitempty" alias:"prioritize_by_locality"`
|
||||
|
||||
Meta map[string]string `json:",omitempty"`
|
||||
CreateIndex uint64
|
||||
|
@ -172,6 +172,10 @@ type ServiceResolverConfigEntry struct {
|
||||
ConnectTimeout time.Duration `json:",omitempty" alias:"connect_timeout"`
|
||||
RequestTimeout time.Duration `json:",omitempty" alias:"request_timeout"`
|
||||
|
||||
// PrioritizeByLocality controls whether the locality of services within the
|
||||
// local partition will be used to prioritize connectivity.
|
||||
PrioritizeByLocality *ServiceResolverPrioritizeByLocality `json:",omitempty" alias:"prioritize_by_locality"`
|
||||
|
||||
// LoadBalancer determines the load balancing policy and configuration for services
|
||||
// issuing requests to this upstream service.
|
||||
LoadBalancer *LoadBalancer `json:",omitempty" alias:"load_balancer"`
|
||||
@ -267,6 +271,13 @@ type ServiceResolverFailoverPolicy struct {
|
||||
Regions []string `json:",omitempty"`
|
||||
}
|
||||
|
||||
type ServiceResolverPrioritizeByLocality struct {
|
||||
// Mode specifies the type of prioritization that will be performed
|
||||
// when selecting nodes in the local partition.
|
||||
// Valid values are: "" (default "none"), "none", and "failover".
|
||||
Mode string `json:",omitempty"`
|
||||
}
|
||||
|
||||
// LoadBalancer determines the load balancing policy and configuration for services
|
||||
// issuing requests to this upstream service.
|
||||
type LoadBalancer struct {
|
||||
|
@ -1394,6 +1394,11 @@ func ServiceResolverToStructs(s *ServiceResolver, t *structs.ServiceResolverConf
|
||||
t.Failover[k] = y
|
||||
}
|
||||
}
|
||||
if s.PrioritizeByLocality != nil {
|
||||
var x structs.ServiceResolverPrioritizeByLocality
|
||||
ServiceResolverPrioritizeByLocalityToStructs(s.PrioritizeByLocality, &x)
|
||||
t.PrioritizeByLocality = &x
|
||||
}
|
||||
t.ConnectTimeout = structs.DurationFromProto(s.ConnectTimeout)
|
||||
t.RequestTimeout = structs.DurationFromProto(s.RequestTimeout)
|
||||
if s.LoadBalancer != nil {
|
||||
@ -1437,6 +1442,11 @@ func ServiceResolverFromStructs(t *structs.ServiceResolverConfigEntry, s *Servic
|
||||
s.Failover[k] = y
|
||||
}
|
||||
}
|
||||
if t.PrioritizeByLocality != nil {
|
||||
var x ServiceResolverPrioritizeByLocality
|
||||
ServiceResolverPrioritizeByLocalityFromStructs(t.PrioritizeByLocality, &x)
|
||||
s.PrioritizeByLocality = &x
|
||||
}
|
||||
s.ConnectTimeout = structs.DurationToProto(t.ConnectTimeout)
|
||||
s.RequestTimeout = structs.DurationToProto(t.RequestTimeout)
|
||||
if t.LoadBalancer != nil {
|
||||
@ -1530,6 +1540,18 @@ func ServiceResolverFailoverTargetFromStructs(t *structs.ServiceResolverFailover
|
||||
s.Datacenter = t.Datacenter
|
||||
s.Peer = t.Peer
|
||||
}
|
||||
func ServiceResolverPrioritizeByLocalityToStructs(s *ServiceResolverPrioritizeByLocality, t *structs.ServiceResolverPrioritizeByLocality) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
t.Mode = s.Mode
|
||||
}
|
||||
func ServiceResolverPrioritizeByLocalityFromStructs(t *structs.ServiceResolverPrioritizeByLocality, s *ServiceResolverPrioritizeByLocality) {
|
||||
if s == nil {
|
||||
return
|
||||
}
|
||||
s.Mode = t.Mode
|
||||
}
|
||||
func ServiceResolverRedirectToStructs(s *ServiceResolverRedirect, t *structs.ServiceResolverRedirect) {
|
||||
if s == nil {
|
||||
return
|
||||
|
@ -127,6 +127,16 @@ func (msg *ServiceResolverFailoverPolicy) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *ServiceResolverPrioritizeByLocality) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *ServiceResolverPrioritizeByLocality) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *ServiceResolverFailoverTarget) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -128,6 +128,7 @@ message ServiceResolver {
|
||||
map<string, string> Meta = 7;
|
||||
// mog: func-to=structs.DurationFromProto func-from=structs.DurationToProto
|
||||
google.protobuf.Duration RequestTimeout = 8;
|
||||
ServiceResolverPrioritizeByLocality PrioritizeByLocality = 9;
|
||||
}
|
||||
|
||||
// mog annotation:
|
||||
@ -180,6 +181,15 @@ message ServiceResolverFailoverPolicy {
|
||||
repeated string Regions = 2;
|
||||
}
|
||||
|
||||
// mog annotation:
|
||||
//
|
||||
// target=github.com/hashicorp/consul/agent/structs.ServiceResolverPrioritizeByLocality
|
||||
// output=config_entry.gen.go
|
||||
// name=Structs
|
||||
message ServiceResolverPrioritizeByLocality {
|
||||
string Mode = 1;
|
||||
}
|
||||
|
||||
// mog annotation:
|
||||
//
|
||||
// target=github.com/hashicorp/consul/agent/structs.ServiceResolverFailoverTarget
|
||||
|
@ -19,15 +19,14 @@
|
||||
package pbsubscribe
|
||||
|
||||
import (
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
|
||||
_ "github.com/hashicorp/consul/proto-public/annotations/ratelimit"
|
||||
pbcommon "github.com/hashicorp/consul/proto/private/pbcommon"
|
||||
pbconfigentry "github.com/hashicorp/consul/proto/private/pbconfigentry"
|
||||
pbservice "github.com/hashicorp/consul/proto/private/pbservice"
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
||||
const (
|
||||
|
Loading…
x
Reference in New Issue
Block a user