From 87cde19b4cf11b004d708b96430dc7ffc7a6d805 Mon Sep 17 00:00:00 2001 From: freddygv Date: Mon, 8 Mar 2021 22:10:27 -0700 Subject: [PATCH] Create new types for service-defaults upstream cfg --- agent/service_manager_test.go | 12 +- agent/structs/config_entry.go | 166 ++++++++++++- agent/structs/config_entry_test.go | 270 ++++++++++++++++++++++ agent/xds/clusters.go | 9 +- agent/xds/config.go | 87 +------ agent/xds/config_test.go | 28 +-- agent/xds/listeners.go | 4 +- api/config_entry.go | 88 ++++++- api/config_entry_test.go | 58 +++++ command/config/write/config_write_test.go | 152 +++++++++++- 10 files changed, 752 insertions(+), 122 deletions(-) diff --git a/agent/service_manager_test.go b/agent/service_manager_test.go index 8ed22e9f5a..bdda31b179 100644 --- a/agent/service_manager_test.go +++ b/agent/service_manager_test.go @@ -418,8 +418,8 @@ func TestServiceManager_PersistService_API(t *testing.T) { "foo": 1, "protocol": "http", }, - UpstreamIDConfigs: structs.UpstreamConfigs{ - structs.UpstreamConfig{ + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + structs.OpaqueUpstreamConfig{ Upstream: structs.NewServiceID("redis", nil), Config: map[string]interface{}{ "protocol": "tcp", @@ -459,8 +459,8 @@ func TestServiceManager_PersistService_API(t *testing.T) { "foo": 1, "protocol": "http", }, - UpstreamIDConfigs: structs.UpstreamConfigs{ - structs.UpstreamConfig{ + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + structs.OpaqueUpstreamConfig{ Upstream: structs.NewServiceID("redis", nil), Config: map[string]interface{}{ "protocol": "tcp", @@ -634,8 +634,8 @@ func TestServiceManager_PersistService_ConfigFiles(t *testing.T) { "foo": 1, "protocol": "http", }, - UpstreamIDConfigs: structs.UpstreamConfigs{ - structs.UpstreamConfig{ + UpstreamIDConfigs: structs.OpaqueUpstreamConfigs{ + structs.OpaqueUpstreamConfig{ Upstream: structs.NewServiceID("redis", nil), Config: map[string]interface{}{ "protocol": "tcp", diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 268c504056..33a50cd4dd 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -87,11 +87,7 @@ type ServiceConfigEntry struct { ExternalSNI string `json:",omitempty" alias:"external_sni"` - // TODO(banks): enable this once we have upstreams supported too. Enabling - // sidecars actually makes no sense and adds complications when you don't - // allow upstreams to be specified centrally too. - // - // Connect ConnectConfiguration + Connect *ConnectConfiguration `json:",omitempty"` Meta map[string]string `json:",omitempty"` EnterpriseMeta `hcl:",squash" mapstructure:",squash"` @@ -131,13 +127,20 @@ func (e *ServiceConfigEntry) Normalize() error { e.Kind = ServiceDefaults e.Protocol = strings.ToLower(e.Protocol) + e.Connect.Normalize() e.EnterpriseMeta.Normalize() return nil } func (e *ServiceConfigEntry) Validate() error { - return validateConfigEntryMeta(e.Meta) + validationErr := validateConfigEntryMeta(e.Meta) + + if err := e.Connect.Validate(); err != nil { + validationErr = multierror.Append(validationErr, err) + } + + return validationErr } func (e *ServiceConfigEntry) CanRead(authz acl.Authorizer) bool { @@ -169,7 +172,38 @@ func (e *ServiceConfigEntry) GetEnterpriseMeta() *EnterpriseMeta { } type ConnectConfiguration struct { - SidecarProxy bool + // UpstreamConfigs is a map of service to per-upstream configuration + UpstreamConfigs map[string]*UpstreamConfig `json:",omitempty" alias:"upstream_configs"` + + // UpstreamDefaults contains default configuration for all upstreams of a given service + UpstreamDefaults *UpstreamConfig `json:",omitempty" alias:"upstream_defaults"` +} + +func (cfg *ConnectConfiguration) Normalize() { + if cfg == nil { + return + } + for _, v := range cfg.UpstreamConfigs { + v.Normalize() + } + + cfg.UpstreamDefaults.Normalize() +} + +func (cfg ConnectConfiguration) Validate() error { + var validationErr error + + for k, v := range cfg.UpstreamConfigs { + if err := v.Validate(); err != nil { + validationErr = multierror.Append(validationErr, fmt.Errorf("error in upstream config for %s: %v", k, err)) + } + } + + if err := cfg.UpstreamDefaults.Validate(); err != nil { + validationErr = multierror.Append(validationErr, fmt.Errorf("error in upstream defaults %v", err)) + } + + return validationErr } // ProxyConfigEntry is the top-level struct for global proxy configuration defaults. @@ -592,13 +626,125 @@ func (r *ServiceConfigRequest) CacheInfo() cache.RequestInfo { } type UpstreamConfig struct { + // ListenerJSON is a complete override ("escape hatch") for the upstream's + // listener. + // + // Note: This escape hatch is NOT compatible with the discovery chain and + // will be ignored if a discovery chain is active. + ListenerJSON string `json:",omitempty" alias:"listener_json"` + + // ClusterJSON is a complete override ("escape hatch") for the upstream's + // cluster. The Connect client TLS certificate and context will be injected + // overriding any TLS settings present. + // + // Note: This escape hatch is NOT compatible with the discovery chain and + // will be ignored if a discovery chain is active. + ClusterJSON string `alias:"cluster_json"` + + // Protocol describes the upstream's service protocol. Valid values are "tcp", + // "http" and "grpc". Anything else is treated as tcp. The enables protocol + // aware features like per-request metrics and connection pooling, tracing, + // routing etc. + Protocol string + + // ConnectTimeoutMs is the number of milliseconds to timeout making a new + // connection to this upstream. Defaults to 5000 (5 seconds) if not set. + ConnectTimeoutMs int `alias:"connect_timeout_ms"` + + // Limits are the set of limits that are applied to the proxy for a specific upstream of a + // service instance. + Limits UpstreamLimits + + // PassiveHealthCheck configuration determines how upstream proxy instances will + // be monitored for removal from the load balancing pool. + PassiveHealthCheck PassiveHealthCheck `json:",omitempty" alias:"passive_health_check"` + + // MeshGatewayConfig controls how Mesh Gateways are configured and used + MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" ` +} + +func (cfg *UpstreamConfig) Normalize() { + if cfg.Protocol == "" { + cfg.Protocol = "tcp" + } else { + cfg.Protocol = strings.ToLower(cfg.Protocol) + } + + if cfg.ConnectTimeoutMs < 1 { + cfg.ConnectTimeoutMs = 5000 + } +} + +func (cfg UpstreamConfig) Validate() error { + var validationErr error + + if err := cfg.PassiveHealthCheck.Validate(); err != nil { + validationErr = multierror.Append(validationErr, err) + } + if err := cfg.Limits.Validate(); err != nil { + validationErr = multierror.Append(validationErr, err) + } + + return validationErr +} + +type PassiveHealthCheck struct { + // Interval between health check analysis sweeps. Each sweep may remove + // hosts or return hosts to the pool. + Interval time.Duration + + // MaxFailures is the count of consecutive failures that results in a host + // being removed from the pool. + MaxFailures uint32 `alias:"max_failures"` +} + +func (chk PassiveHealthCheck) Validate() error { + if chk.Interval <= 0*time.Second { + return fmt.Errorf("passive health check interval must be greater than 0s") + } + return nil +} + +// UpstreamLimits describes the limits that are associated with a specific +// upstream of a service instance. +type UpstreamLimits struct { + // MaxConnections is the maximum number of connections the local proxy can + // make to the upstream service. + MaxConnections *int `alias:"max_connections"` + + // MaxPendingRequests is the maximum number of requests that will be queued + // waiting for an available connection. This is mostly applicable to HTTP/1.1 + // clusters since all HTTP/2 requests are streamed over a single + // connection. + MaxPendingRequests *int `alias:"max_pending_requests"` + + // MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed + // to the upstream cluster at a point in time. This is mostly applicable to HTTP/2 + // clusters since all HTTP/1.1 requests are limited by MaxConnections. + MaxConcurrentRequests *int `alias:"max_concurrent_requests"` +} + +func (ul UpstreamLimits) Validate() error { + if ul.MaxConnections != nil && *ul.MaxConnections <= 0 { + return fmt.Errorf("max connections must be at least 0") + } + if ul.MaxPendingRequests != nil && *ul.MaxPendingRequests <= 0 { + return fmt.Errorf("max pending requests must be at least 0") + } + if ul.MaxConcurrentRequests != nil && *ul.MaxConcurrentRequests <= 0 { + return fmt.Errorf("max concurrent requests must be at least 0") + } + return nil +} + +type OpaqueUpstreamConfig struct { Upstream ServiceID Config map[string]interface{} } -type UpstreamConfigs []UpstreamConfig +type OpaqueUpstreamConfigs []OpaqueUpstreamConfig -func (configs UpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[string]interface{}, found bool) { +func (configs OpaqueUpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[string]interface{}, found bool) { for _, usconf := range configs { if usconf.Upstream.Matches(sid) { return usconf.Config, true @@ -611,7 +757,7 @@ func (configs UpstreamConfigs) GetUpstreamConfig(sid ServiceID) (config map[stri type ServiceConfigResponse struct { ProxyConfig map[string]interface{} UpstreamConfigs map[string]map[string]interface{} - UpstreamIDConfigs UpstreamConfigs + UpstreamIDConfigs OpaqueUpstreamConfigs MeshGateway MeshGatewayConfig `json:",omitempty"` Expose ExposeConfig `json:",omitempty"` QueryMeta diff --git a/agent/structs/config_entry_test.go b/agent/structs/config_entry_test.go index e5a92de3ff..272e1a1833 100644 --- a/agent/structs/config_entry_test.go +++ b/agent/structs/config_entry_test.go @@ -112,6 +112,33 @@ func TestDecodeConfigEntry(t *testing.T) { mesh_gateway { mode = "remote" } + connect { + upstream_configs { + redis { + passive_health_check { + interval = "2s" + max_failures = 3 + } + } + + "finance/billing" { + mesh_gateway { + mode = "remote" + } + } + } + upstream_defaults { + connect_timeout_ms = 5 + protocol = "http" + listener_json = "foo" + cluster_json = "bar" + limits { + max_connections = 3 + max_pending_requests = 4 + max_concurrent_requests = 5 + } + } + } `, camel: ` Kind = "service-defaults" @@ -125,6 +152,33 @@ func TestDecodeConfigEntry(t *testing.T) { MeshGateway { Mode = "remote" } + Connect { + UpstreamConfigs { + "redis" { + PassiveHealthCheck { + MaxFailures = 3 + Interval = "2s" + } + } + + "finance/billing" { + MeshGateway { + Mode = "remote" + } + } + } + UpstreamDefaults { + ListenerJSON = "foo" + ClusterJSON = "bar" + ConnectTimeoutMs = 5 + Protocol = "http" + Limits { + MaxConnections = 3 + MaxPendingRequests = 4 + MaxConcurrentRequests = 5 + } + } + } `, expect: &ServiceConfigEntry{ Kind: "service-defaults", @@ -138,6 +192,30 @@ func TestDecodeConfigEntry(t *testing.T) { MeshGateway: MeshGatewayConfig{ Mode: MeshGatewayModeRemote, }, + Connect: &ConnectConfiguration{ + UpstreamConfigs: map[string]*UpstreamConfig{ + "redis": { + PassiveHealthCheck: PassiveHealthCheck{ + MaxFailures: 3, + Interval: 2 * time.Second, + }, + }, + "finance/billing": { + MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote}, + }, + }, + UpstreamDefaults: &UpstreamConfig{ + ListenerJSON: "foo", + ClusterJSON: "bar", + ConnectTimeoutMs: 5, + Protocol: "http", + Limits: UpstreamLimits{ + MaxConnections: intPointer(3), + MaxPendingRequests: intPointer(4), + MaxConcurrentRequests: intPointer(5), + }, + }, + }, }, }, { @@ -1330,7 +1408,199 @@ func TestConfigEntryResponseMarshalling(t *testing.T) { } } +func TestPassiveHealthCheck_Validate(t *testing.T) { + tt := []struct { + name string + input PassiveHealthCheck + wantErr bool + wantMsg string + }{ + { + name: "valid-interval", + input: PassiveHealthCheck{Interval: 2 * time.Second}, + wantErr: false, + }, + { + name: "negative-interval", + input: PassiveHealthCheck{Interval: -1 * time.Second}, + wantErr: true, + wantMsg: "greater than 0s", + }, + { + name: "zero-interval", + input: PassiveHealthCheck{Interval: 0 * time.Second}, + wantErr: true, + wantMsg: "greater than 0s", + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + err := tc.input.Validate() + if err == nil { + require.False(t, tc.wantErr) + return + } + require.Contains(t, err.Error(), tc.wantMsg) + }) + } +} + +func TestUpstreamLimits_Validate(t *testing.T) { + tt := []struct { + name string + input UpstreamLimits + wantErr bool + wantMsg string + }{ + { + name: "valid-max-conns", + input: UpstreamLimits{MaxConnections: intPointer(1)}, + wantErr: false, + }, + { + name: "zero-max-conns", + input: UpstreamLimits{MaxConnections: intPointer(0)}, + wantErr: true, + wantMsg: "at least 0", + }, + { + name: "negative-max-conns", + input: UpstreamLimits{MaxConnections: intPointer(-1)}, + wantErr: true, + wantMsg: "at least 0", + }, + { + name: "valid-max-concurrent", + input: UpstreamLimits{MaxConcurrentRequests: intPointer(1)}, + wantErr: false, + }, + { + name: "zero-max-concurrent", + input: UpstreamLimits{MaxConcurrentRequests: intPointer(0)}, + wantErr: true, + wantMsg: "at least 0", + }, + { + name: "negative-max-concurrent", + input: UpstreamLimits{MaxConcurrentRequests: intPointer(-1)}, + wantErr: true, + wantMsg: "at least 0", + }, + { + name: "valid-max-pending", + input: UpstreamLimits{MaxPendingRequests: intPointer(1)}, + wantErr: false, + }, + { + name: "zero-max-pending", + input: UpstreamLimits{MaxPendingRequests: intPointer(0)}, + wantErr: true, + wantMsg: "at least 0", + }, + { + name: "negative-max-pending", + input: UpstreamLimits{MaxPendingRequests: intPointer(-1)}, + wantErr: true, + wantMsg: "at least 0", + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + err := tc.input.Validate() + if err == nil { + require.False(t, tc.wantErr) + return + } + require.Contains(t, err.Error(), tc.wantMsg) + }) + } +} + +func TestServiceConfigEntry_Normalize(t *testing.T) { + tt := []struct { + name string + input ServiceConfigEntry + expect ServiceConfigEntry + }{ + { + name: "fill-in-kind", + input: ServiceConfigEntry{ + Name: "web", + }, + expect: ServiceConfigEntry{ + Kind: ServiceDefaults, + Name: "web", + }, + }, + { + name: "lowercase-protocol", + input: ServiceConfigEntry{ + Kind: ServiceDefaults, + Name: "web", + Protocol: "PrOtoCoL", + }, + expect: ServiceConfigEntry{ + Kind: ServiceDefaults, + Name: "web", + Protocol: "protocol", + }, + }, + { + name: "connect-kitchen-sink", + input: ServiceConfigEntry{ + Kind: ServiceDefaults, + Name: "web", + Connect: &ConnectConfiguration{ + UpstreamConfigs: map[string]*UpstreamConfig{ + "redis": { + Protocol: "TcP", + }, + "memcached": { + ConnectTimeoutMs: -1, + }, + }, + UpstreamDefaults: &UpstreamConfig{ConnectTimeoutMs: -20}, + }, + }, + expect: ServiceConfigEntry{ + Kind: ServiceDefaults, + Name: "web", + Connect: &ConnectConfiguration{ + UpstreamConfigs: map[string]*UpstreamConfig{ + "redis": { + Protocol: "tcp", + ConnectTimeoutMs: 5000, + }, + "memcached": { + Protocol: "tcp", + ConnectTimeoutMs: 5000, + }, + }, + UpstreamDefaults: &UpstreamConfig{ + Protocol: "tcp", + ConnectTimeoutMs: 5000, + }, + }, + }, + }, + } + + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + err := tc.input.Normalize() + require.NoError(t, err) + require.Equal(t, tc.expect, tc.input) + }) + } +} + func requireContainsLower(t *testing.T, haystack, needle string) { t.Helper() require.Contains(t, strings.ToLower(haystack), strings.ToLower(needle)) } + +func intPointer(i int) *int { + return &i +} diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index fa527e2a70..0411ab147b 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -416,7 +416,7 @@ func (s *Server) makeUpstreamClusterForPreparedQuery(upstream structs.Upstream, CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{ Thresholds: makeThresholdsIfNeeded(cfg.Limits), }, - OutlierDetection: cfg.PassiveHealthCheck.AsOutlierDetection(), + OutlierDetection: ToOutlierDetection(cfg.PassiveHealthCheck), } if cfg.Protocol == "http2" || cfg.Protocol == "grpc" { c.Http2ProtocolOptions = &envoy_core_v3.Http2ProtocolOptions{} @@ -524,7 +524,7 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain( CircuitBreakers: &envoy_cluster_v3.CircuitBreakers{ Thresholds: makeThresholdsIfNeeded(cfg.Limits), }, - OutlierDetection: cfg.PassiveHealthCheck.AsOutlierDetection(), + OutlierDetection: ToOutlierDetection(cfg.PassiveHealthCheck), } var lb *structs.LoadBalancer @@ -734,8 +734,8 @@ func (s *Server) makeGatewayCluster(snap *proxycfg.ConfigSnapshot, opts gatewayC return cluster } -func makeThresholdsIfNeeded(limits UpstreamLimits) []*envoy_cluster_v3.CircuitBreakers_Thresholds { - var empty UpstreamLimits +func makeThresholdsIfNeeded(limits structs.UpstreamLimits) []*envoy_cluster_v3.CircuitBreakers_Thresholds { + var empty structs.UpstreamLimits // Make sure to not create any thresholds when passed the zero-value in order // to rely on Envoy defaults if limits == empty { @@ -743,6 +743,7 @@ func makeThresholdsIfNeeded(limits UpstreamLimits) []*envoy_cluster_v3.CircuitBr } threshold := &envoy_cluster_v3.CircuitBreakers_Thresholds{} + // Likewise, make sure to not set any threshold values on the zero-value in // order to rely on Envoy defaults if limits.MaxConnections != nil { diff --git a/agent/xds/config.go b/agent/xds/config.go index 57fb0d25ef..dfec6493c2 100644 --- a/agent/xds/config.go +++ b/agent/xds/config.go @@ -1,10 +1,8 @@ package xds import ( - "strings" - "time" - envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + "strings" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/wrappers" @@ -150,74 +148,10 @@ func ParseGatewayConfig(m map[string]interface{}) (GatewayConfig, error) { return cfg, err } -// UpstreamLimits describes the limits that are associated with a specific -// upstream of a service instance. -type UpstreamLimits struct { - // MaxConnections is the maximum number of connections the local proxy can - // make to the upstream service. - MaxConnections *int `mapstructure:"max_connections"` - - // MaxPendingRequests is the maximum number of requests that will be queued - // waiting for an available connection. This is mostly applicable to HTTP/1.1 - // clusters since all HTTP/2 requests are streamed over a single - // connection. - MaxPendingRequests *int `mapstructure:"max_pending_requests"` - - // MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed - // to the upstream cluster at a point in time. This is mostly applicable to HTTP/2 - // clusters since all HTTP/1.1 requests are limited by MaxConnections. - MaxConcurrentRequests *int `mapstructure:"max_concurrent_requests"` -} - -// UpstreamConfig describes the keys we understand from -// Connect.Proxy.Upstream[*].Config. -type UpstreamConfig struct { - // ListenerJSON is a complete override ("escape hatch") for the upstream's - // listener. - // - // Note: This escape hatch is NOT compatible with the discovery chain and - // will be ignored if a discovery chain is active. - ListenerJSON string `mapstructure:"envoy_listener_json"` - - // ClusterJSON is a complete override ("escape hatch") for the upstream's - // cluster. The Connect client TLS certificate and context will be injected - // overriding any TLS settings present. - // - // Note: This escape hatch is NOT compatible with the discovery chain and - // will be ignored if a discovery chain is active. - ClusterJSON string `mapstructure:"envoy_cluster_json"` - - // Protocol describes the upstream's service protocol. Valid values are "tcp", - // "http" and "grpc". Anything else is treated as tcp. The enables protocol - // aware features like per-request metrics and connection pooling, tracing, - // routing etc. - Protocol string `mapstructure:"protocol"` - - // ConnectTimeoutMs is the number of milliseconds to timeout making a new - // connection to this upstream. Defaults to 5000 (5 seconds) if not set. - ConnectTimeoutMs int `mapstructure:"connect_timeout_ms"` - - // Limits are the set of limits that are applied to the proxy for a specific upstream of a - // service instance. - Limits UpstreamLimits `mapstructure:"limits"` - - // PassiveHealthCheck configuration - PassiveHealthCheck PassiveHealthCheck `mapstructure:"passive_health_check"` -} - -type PassiveHealthCheck struct { - // Interval between health check analysis sweeps. Each sweep may remove - // hosts or return hosts to the pool. - Interval time.Duration - // MaxFailures is the count of consecutive failures that results in a host - // being removed from the pool. - MaxFailures uint32 `mapstructure:"max_failures"` -} - // Return an envoy.OutlierDetection populated by the values from this struct. // If all values are zero a default empty OutlierDetection will be returned to // enable outlier detection with default values. -func (p PassiveHealthCheck) AsOutlierDetection() *envoy_cluster_v3.OutlierDetection { +func ToOutlierDetection(p structs.PassiveHealthCheck) *envoy_cluster_v3.OutlierDetection { od := &envoy_cluster_v3.OutlierDetection{} if p.Interval != 0 { od.Interval = ptypes.DurationProto(p.Interval) @@ -228,8 +162,8 @@ func (p PassiveHealthCheck) AsOutlierDetection() *envoy_cluster_v3.OutlierDetect return od } -func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, error) { - var cfg UpstreamConfig +func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (structs.UpstreamConfig, error) { + var cfg structs.UpstreamConfig config := &mapstructure.DecoderConfig{ DecodeHook: mapstructure.ComposeDecodeHookFunc( decode.HookWeakDecodeFromSlice, @@ -252,16 +186,11 @@ func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, er // ParseUpstreamConfig returns the UpstreamConfig parsed from an opaque map. // If an error occurs during parsing it is returned along with the default // config this allows caller to choose whether and how to report the error. -func ParseUpstreamConfig(m map[string]interface{}) (UpstreamConfig, error) { +func ParseUpstreamConfig(m map[string]interface{}) (structs.UpstreamConfig, error) { cfg, err := ParseUpstreamConfigNoDefaults(m) + // Set defaults (even if error is returned) - if cfg.Protocol == "" { - cfg.Protocol = "tcp" - } else { - cfg.Protocol = strings.ToLower(cfg.Protocol) - } - if cfg.ConnectTimeoutMs < 1 { - cfg.ConnectTimeoutMs = 5000 - } + cfg.Normalize() + return cfg, err } diff --git a/agent/xds/config_test.go b/agent/xds/config_test.go index e1e8c7276c..0265c01518 100644 --- a/agent/xds/config_test.go +++ b/agent/xds/config_test.go @@ -172,12 +172,12 @@ func TestParseUpstreamConfig(t *testing.T) { tests := []struct { name string input map[string]interface{} - want UpstreamConfig + want structs.UpstreamConfig }{ { name: "defaults - nil", input: nil, - want: UpstreamConfig{ + want: structs.UpstreamConfig{ ConnectTimeoutMs: 5000, Protocol: "tcp", }, @@ -185,7 +185,7 @@ func TestParseUpstreamConfig(t *testing.T) { { name: "defaults - empty", input: map[string]interface{}{}, - want: UpstreamConfig{ + want: structs.UpstreamConfig{ ConnectTimeoutMs: 5000, Protocol: "tcp", }, @@ -196,7 +196,7 @@ func TestParseUpstreamConfig(t *testing.T) { "foo": "bar", "envoy_foo": "envoy_bar", }, - want: UpstreamConfig{ + want: structs.UpstreamConfig{ ConnectTimeoutMs: 5000, Protocol: "tcp", }, @@ -206,7 +206,7 @@ func TestParseUpstreamConfig(t *testing.T) { input: map[string]interface{}{ "protocol": "http", }, - want: UpstreamConfig{ + want: structs.UpstreamConfig{ ConnectTimeoutMs: 5000, Protocol: "http", }, @@ -216,7 +216,7 @@ func TestParseUpstreamConfig(t *testing.T) { input: map[string]interface{}{ "connect_timeout_ms": "1000", }, - want: UpstreamConfig{ + want: structs.UpstreamConfig{ ConnectTimeoutMs: 1000, Protocol: "tcp", }, @@ -226,7 +226,7 @@ func TestParseUpstreamConfig(t *testing.T) { input: map[string]interface{}{ "connect_timeout_ms": float64(1000.0), }, - want: UpstreamConfig{ + want: structs.UpstreamConfig{ ConnectTimeoutMs: 1000, Protocol: "tcp", }, @@ -236,7 +236,7 @@ func TestParseUpstreamConfig(t *testing.T) { input: map[string]interface{}{ "connect_timeout_ms": 1000, }, - want: UpstreamConfig{ + want: structs.UpstreamConfig{ ConnectTimeoutMs: 1000, Protocol: "tcp", }, @@ -250,10 +250,10 @@ func TestParseUpstreamConfig(t *testing.T) { "max_concurrent_requests": 70, }, }, - want: UpstreamConfig{ + want: structs.UpstreamConfig{ ConnectTimeoutMs: 5000, Protocol: "tcp", - Limits: UpstreamLimits{ + Limits: structs.UpstreamLimits{ MaxConnections: intPointer(50), MaxPendingRequests: intPointer(60), MaxConcurrentRequests: intPointer(70), @@ -269,10 +269,10 @@ func TestParseUpstreamConfig(t *testing.T) { "max_concurrent_requests": 0, }, }, - want: UpstreamConfig{ + want: structs.UpstreamConfig{ ConnectTimeoutMs: 5000, Protocol: "tcp", - Limits: UpstreamLimits{ + Limits: structs.UpstreamLimits{ MaxConnections: intPointer(0), MaxPendingRequests: intPointer(0), MaxConcurrentRequests: intPointer(0), @@ -287,10 +287,10 @@ func TestParseUpstreamConfig(t *testing.T) { "max_failures": 7, }, }, - want: UpstreamConfig{ + want: structs.UpstreamConfig{ ConnectTimeoutMs: 5000, Protocol: "tcp", - PassiveHealthCheck: PassiveHealthCheck{ + PassiveHealthCheck: structs.PassiveHealthCheck{ Interval: 22 * time.Second, MaxFailures: 7, }, diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index 3e5f552f48..77d33b47d4 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -1071,9 +1071,9 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain( return l, nil } -func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) UpstreamConfig { +func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstream, chain *structs.CompiledDiscoveryChain) structs.UpstreamConfig { var ( - cfg UpstreamConfig + cfg structs.UpstreamConfig err error ) diff --git a/api/config_entry.go b/api/config_entry.go index f5ef60e294..a6bace0a98 100644 --- a/api/config_entry.go +++ b/api/config_entry.go @@ -91,15 +91,91 @@ type ExposePath struct { ParsedFromCheck bool } +type ConnectConfiguration struct { + // UpstreamConfigs is a map of service to per-upstream configuration + UpstreamConfigs map[string]UpstreamConfig `json:",omitempty" alias:"upstream_configs"` + + // UpstreamDefaults contains default configuration for all upstreams of a given service + UpstreamDefaults UpstreamConfig `json:",omitempty" alias:"upstream_defaults"` +} + +type UpstreamConfig struct { + // ListenerJSON is a complete override ("escape hatch") for the upstream's + // listener. + // + // Note: This escape hatch is NOT compatible with the discovery chain and + // will be ignored if a discovery chain is active. + ListenerJSON string `json:",omitempty" alias:"listener_json"` + + // ClusterJSON is a complete override ("escape hatch") for the upstream's + // cluster. The Connect client TLS certificate and context will be injected + // overriding any TLS settings present. + // + // Note: This escape hatch is NOT compatible with the discovery chain and + // will be ignored if a discovery chain is active. + ClusterJSON string `alias:"cluster_json"` + + // Protocol describes the upstream's service protocol. Valid values are "tcp", + // "http" and "grpc". Anything else is treated as tcp. The enables protocol + // aware features like per-request metrics and connection pooling, tracing, + // routing etc. + Protocol string + + // ConnectTimeoutMs is the number of milliseconds to timeout making a new + // connection to this upstream. Defaults to 5000 (5 seconds) if not set. + ConnectTimeoutMs int `alias:"connect_timeout_ms"` + + // Limits are the set of limits that are applied to the proxy for a specific upstream of a + // service instance. + Limits UpstreamLimits + + // PassiveHealthCheck configuration determines how upstream proxy instances will + // be monitored for removal from the load balancing pool. + PassiveHealthCheck PassiveHealthCheck `json:",omitempty" alias:"passive_health_check"` + + // MeshGatewayConfig controls how Mesh Gateways are configured and used + MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" ` +} + +type PassiveHealthCheck struct { + // Interval between health check analysis sweeps. Each sweep may remove + // hosts or return hosts to the pool. + Interval time.Duration + + // MaxFailures is the count of consecutive failures that results in a host + // being removed from the pool. + MaxFailures uint32 `alias:"max_failures"` +} + +// UpstreamLimits describes the limits that are associated with a specific +// upstream of a service instance. +type UpstreamLimits struct { + // MaxConnections is the maximum number of connections the local proxy can + // make to the upstream service. + MaxConnections int `alias:"max_connections"` + + // MaxPendingRequests is the maximum number of requests that will be queued + // waiting for an available connection. This is mostly applicable to HTTP/1.1 + // clusters since all HTTP/2 requests are streamed over a single + // connection. + MaxPendingRequests int `alias:"max_pending_requests"` + + // MaxConcurrentRequests is the maximum number of in-flight requests that will be allowed + // to the upstream cluster at a point in time. This is mostly applicable to HTTP/2 + // clusters since all HTTP/1.1 requests are limited by MaxConnections. + MaxConcurrentRequests int `alias:"max_concurrent_requests"` +} + type ServiceConfigEntry struct { Kind string Name string - Namespace string `json:",omitempty"` - Protocol string `json:",omitempty"` - MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"` - Expose ExposeConfig `json:",omitempty"` - ExternalSNI string `json:",omitempty" alias:"external_sni"` - Meta map[string]string `json:",omitempty"` + Namespace string `json:",omitempty"` + Protocol string `json:",omitempty"` + MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway"` + Connect ConnectConfiguration `json:",omitempty"` + Expose ExposeConfig `json:",omitempty"` + ExternalSNI string `json:",omitempty" alias:"external_sni"` + Meta map[string]string `json:",omitempty"` CreateIndex uint64 ModifyIndex uint64 } diff --git a/api/config_entry_test.go b/api/config_entry_test.go index 334c972521..62b0d79664 100644 --- a/api/config_entry_test.go +++ b/api/config_entry_test.go @@ -332,6 +332,36 @@ func TestDecodeConfigEntry(t *testing.T) { "ExternalSNI": "abc-123", "MeshGateway": { "Mode": "remote" + }, + "Connect": { + "UpstreamConfigs": { + "redis": { + "PassiveHealthCheck": { + "MaxFailures": 3, + "Interval": "2s" + } + }, + "finance/billing": { + "MeshGateway": { + "Mode": "remote" + } + } + }, + "UpstreamDefaults": { + "ClusterJSON": "zip", + "ListenerJSON": "zop", + "ConnectTimeoutMs": 5000, + "Protocol": "http", + "Limits": { + "MaxConnections": 3, + "MaxPendingRequests": 4, + "MaxConcurrentRequests": 5 + }, + "PassiveHealthCheck": { + "MaxFailures": 5, + "Interval": "4s" + } + } } } `, @@ -347,6 +377,34 @@ func TestDecodeConfigEntry(t *testing.T) { MeshGateway: MeshGatewayConfig{ Mode: MeshGatewayModeRemote, }, + Connect: ConnectConfiguration{ + UpstreamConfigs: map[string]UpstreamConfig{ + "redis": { + PassiveHealthCheck: PassiveHealthCheck{ + MaxFailures: 3, + Interval: 2 * time.Second, + }, + }, + "finance/billing": { + MeshGateway: MeshGatewayConfig{Mode: "remote"}, + }, + }, + UpstreamDefaults: UpstreamConfig{ + ClusterJSON: "zip", + ListenerJSON: "zop", + Protocol: "http", + ConnectTimeoutMs: 5000, + Limits: UpstreamLimits{ + MaxConnections: 3, + MaxPendingRequests: 4, + MaxConcurrentRequests: 5, + }, + PassiveHealthCheck: PassiveHealthCheck{ + MaxFailures: 5, + Interval: 4 * time.Second, + }, + }, + }, }, }, { diff --git a/command/config/write/config_write_test.go b/command/config/write/config_write_test.go index 814ccc3c0e..bc8a1ff799 100644 --- a/command/config/write/config_write_test.go +++ b/command/config/write/config_write_test.go @@ -423,7 +423,7 @@ func TestParseConfigEntry(t *testing.T) { }, }, { - name: "service-defaults", + name: "service-defaults: kitchen sink", snake: ` kind = "service-defaults" name = "main" @@ -436,6 +436,36 @@ func TestParseConfigEntry(t *testing.T) { mesh_gateway { mode = "remote" } + connect { + upstream_configs { + "redis" { + passive_health_check { + max_failures = 3 + interval = "2s" + } + } + "finance/billing" { + mesh_gateway { + mode = "remote" + } + } + } + upstream_defaults { + cluster_json = "zip" + listener_json = "zop" + connect_timeout_ms = 5000 + protocol = "http" + limits { + max_connections = 3 + max_pending_requests = 4 + max_concurrent_requests = 5 + } + passive_health_check { + max_failures = 5 + interval = "4s" + } + } + } `, camel: ` Kind = "service-defaults" @@ -449,6 +479,36 @@ func TestParseConfigEntry(t *testing.T) { MeshGateway { Mode = "remote" } + connect = { + upstream_configs = { + "redis" = { + passive_health_check = { + max_failures = 3 + interval = "2s" + } + } + "finance/billing" = { + mesh_gateway = { + mode = "remote" + } + } + } + upstream_defaults = { + cluster_json = "zip" + listener_json = "zop" + connect_timeout_ms = 5000 + protocol = "http" + limits = { + max_connections = 3 + max_pending_requests = 4 + max_concurrent_requests = 5 + } + passive_health_check = { + max_failures = 5 + interval = "4s" + } + } + } `, snakeJSON: ` { @@ -462,6 +522,36 @@ func TestParseConfigEntry(t *testing.T) { "external_sni": "abc-123", "mesh_gateway": { "mode": "remote" + }, + "connect": { + "upstream_configs": { + "redis": { + "passive_health_check": { + "max_failures": 3, + "interval": "2s" + } + }, + "finance/billing": { + "mesh_gateway": { + "mode": "remote" + } + } + }, + "upstream_defaults": { + "cluster_json": "zip", + "listener_json": "zop", + "connect_timeout_ms": 5000, + "protocol": "http", + "limits": { + "max_connections": 3, + "max_pending_requests": 4, + "max_concurrent_requests": 5 + }, + "passive_health_check": { + "max_failures": 5, + "interval": "4s" + } + } } } `, @@ -477,6 +567,36 @@ func TestParseConfigEntry(t *testing.T) { "ExternalSNI": "abc-123", "MeshGateway": { "Mode": "remote" + }, + "Connect": { + "UpstreamConfigs": { + "redis": { + "PassiveHealthCheck": { + "MaxFailures": 3, + "Interval": "2s" + } + }, + "finance/billing": { + "MeshGateway": { + "Mode": "remote" + } + } + }, + "UpstreamDefaults": { + "ClusterJSON": "zip", + "ListenerJSON": "zop", + "ConnectTimeoutMs": 5000, + "Protocol": "http", + "Limits": { + "MaxConnections": 3, + "MaxPendingRequests": 4, + "MaxConcurrentRequests": 5 + }, + "PassiveHealthCheck": { + "MaxFailures": 5, + "Interval": "4s" + } + } } } `, @@ -492,6 +612,36 @@ func TestParseConfigEntry(t *testing.T) { MeshGateway: api.MeshGatewayConfig{ Mode: api.MeshGatewayModeRemote, }, + Connect: api.ConnectConfiguration{ + UpstreamConfigs: map[string]api.UpstreamConfig{ + "redis": { + PassiveHealthCheck: api.PassiveHealthCheck{ + MaxFailures: 3, + Interval: 2 * time.Second, + }, + }, + "finance/billing": { + MeshGateway: api.MeshGatewayConfig{ + Mode: "remote", + }, + }, + }, + UpstreamDefaults: api.UpstreamConfig{ + ClusterJSON: "zip", + ListenerJSON: "zop", + Protocol: "http", + ConnectTimeoutMs: 5000, + Limits: api.UpstreamLimits{ + MaxConnections: 3, + MaxPendingRequests: 4, + MaxConcurrentRequests: 5, + }, + PassiveHealthCheck: api.PassiveHealthCheck{ + MaxFailures: 5, + Interval: 4 * time.Second, + }, + }, + }, }, }, {