diff --git a/agent/structs/config_entry.go b/agent/structs/config_entry.go index 33a50cd4dd..c43038bf3a 100644 --- a/agent/structs/config_entry.go +++ b/agent/structs/config_entry.go @@ -136,8 +136,11 @@ func (e *ServiceConfigEntry) Normalize() error { func (e *ServiceConfigEntry) Validate() error { validationErr := validateConfigEntryMeta(e.Meta) - if err := e.Connect.Validate(); err != nil { - validationErr = multierror.Append(validationErr, err) + if e.Connect != nil { + err := e.Connect.Validate() + if err != nil { + validationErr = multierror.Append(validationErr, err) + } } return validationErr @@ -663,6 +666,41 @@ type UpstreamConfig struct { MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" ` } +func (cfg UpstreamConfig) MergeInto(dst map[string]interface{}, legacy bool) { + var ( + listenerKey = "listener_json" + clusterKey = "cluster_json" + ) + // Starting in Consul 1.10, the "envoy_" prefix was removed from these flags + if legacy { + listenerKey = fmt.Sprintf("envoy_%s", listenerKey) + clusterKey = fmt.Sprintf("envoy_%s", clusterKey) + } + + // Avoid storing empty values in the map, since these can act as overrides + if cfg.ListenerJSON != "" { + dst[listenerKey] = cfg.ListenerJSON + } + if cfg.ClusterJSON != "" { + dst[clusterKey] = cfg.ClusterJSON + } + if cfg.Protocol != "" { + dst["protocol"] = cfg.Protocol + } + if cfg.ConnectTimeoutMs != 0 { + dst["connect_timeout_ms"] = cfg.ConnectTimeoutMs + } + if !cfg.MeshGateway.IsZero() { + dst["mesh_gateway"] = cfg.MeshGateway + } + if !cfg.Limits.IsZero() { + dst["limits"] = cfg.Limits + } + if !cfg.PassiveHealthCheck.IsZero() { + dst["passive_health_check"] = cfg.PassiveHealthCheck + } +} + func (cfg *UpstreamConfig) Normalize() { if cfg.Protocol == "" { cfg.Protocol = "tcp" @@ -688,6 +726,39 @@ func (cfg UpstreamConfig) Validate() error { return validationErr } +func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, error) { + var cfg UpstreamConfig + config := &mapstructure.DecoderConfig{ + DecodeHook: mapstructure.ComposeDecodeHookFunc( + decode.HookWeakDecodeFromSlice, + decode.HookTranslateKeys, + mapstructure.StringToTimeDurationHookFunc(), + ), + Result: &cfg, + WeaklyTypedInput: true, + } + + decoder, err := mapstructure.NewDecoder(config) + if err != nil { + return cfg, err + } + + err = decoder.Decode(m) + return cfg, err +} + +// ParseUpstreamConfig returns the UpstreamConfig parsed from an opaque map. +// If an error occurs during parsing it is returned along with the default +// config this allows caller to choose whether and how to report the error. +func ParseUpstreamConfig(m map[string]interface{}) (UpstreamConfig, error) { + cfg, err := ParseUpstreamConfigNoDefaults(m) + + // Set defaults (even if error is returned) + cfg.Normalize() + + return cfg, err +} + type PassiveHealthCheck struct { // Interval between health check analysis sweeps. Each sweep may remove // hosts or return hosts to the pool. @@ -698,6 +769,11 @@ type PassiveHealthCheck struct { MaxFailures uint32 `alias:"max_failures"` } +func (chk *PassiveHealthCheck) IsZero() bool { + zeroVal := PassiveHealthCheck{} + return *chk == zeroVal +} + func (chk PassiveHealthCheck) Validate() error { if chk.Interval <= 0*time.Second { return fmt.Errorf("passive health check interval must be greater than 0s") @@ -724,6 +800,11 @@ type UpstreamLimits struct { MaxConcurrentRequests *int `alias:"max_concurrent_requests"` } +func (ul *UpstreamLimits) IsZero() bool { + zeroVal := UpstreamLimits{} + return *ul == zeroVal +} + func (ul UpstreamLimits) Validate() error { if ul.MaxConnections != nil && *ul.MaxConnections <= 0 { return fmt.Errorf("max connections must be at least 0") diff --git a/agent/structs/config_entry_test.go b/agent/structs/config_entry_test.go index 272e1a1833..36edcaef62 100644 --- a/agent/structs/config_entry_test.go +++ b/agent/structs/config_entry_test.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/hcl" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -1596,6 +1597,322 @@ func TestServiceConfigEntry_Normalize(t *testing.T) { } } +func TestUpstreamConfig_MergeInto(t *testing.T) { + tt := []struct { + name string + source UpstreamConfig + destination map[string]interface{} + legacy bool + want map[string]interface{} + }{ + { + name: "kitchen sink", + legacy: false, + source: UpstreamConfig{ + ListenerJSON: "foo", + ClusterJSON: "bar", + ConnectTimeoutMs: 5, + Protocol: "http", + Limits: UpstreamLimits{ + MaxConnections: intPointer(3), + MaxPendingRequests: intPointer(4), + MaxConcurrentRequests: intPointer(5), + }, + PassiveHealthCheck: PassiveHealthCheck{ + MaxFailures: 3, + Interval: 2 * time.Second, + }, + MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote}, + }, + destination: make(map[string]interface{}), + want: map[string]interface{}{ + "listener_json": "foo", + "cluster_json": "bar", + "connect_timeout_ms": 5, + "protocol": "http", + "limits": UpstreamLimits{ + MaxConnections: intPointer(3), + MaxPendingRequests: intPointer(4), + MaxConcurrentRequests: intPointer(5), + }, + "passive_health_check": PassiveHealthCheck{ + MaxFailures: 3, + Interval: 2 * time.Second, + }, + "mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeRemote}, + }, + }, + { + name: "kitchen sink override of destination", + legacy: false, + source: UpstreamConfig{ + ListenerJSON: "foo", + ClusterJSON: "bar", + ConnectTimeoutMs: 5, + Protocol: "http", + Limits: UpstreamLimits{ + MaxConnections: intPointer(3), + MaxPendingRequests: intPointer(4), + MaxConcurrentRequests: intPointer(5), + }, + PassiveHealthCheck: PassiveHealthCheck{ + MaxFailures: 3, + Interval: 2 * time.Second, + }, + MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote}, + }, + destination: map[string]interface{}{ + "listener_json": "zip", + "cluster_json": "zap", + "connect_timeout_ms": 10, + "protocol": "grpc", + "limits": UpstreamLimits{ + MaxConnections: intPointer(10), + MaxPendingRequests: intPointer(11), + MaxConcurrentRequests: intPointer(12), + }, + "passive_health_check": PassiveHealthCheck{ + MaxFailures: 13, + Interval: 14 * time.Second, + }, + "mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal}, + }, + want: map[string]interface{}{ + "listener_json": "foo", + "cluster_json": "bar", + "connect_timeout_ms": 5, + "protocol": "http", + "limits": UpstreamLimits{ + MaxConnections: intPointer(3), + MaxPendingRequests: intPointer(4), + MaxConcurrentRequests: intPointer(5), + }, + "passive_health_check": PassiveHealthCheck{ + MaxFailures: 3, + Interval: 2 * time.Second, + }, + "mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeRemote}, + }, + }, + { + name: "legacy flag adds envoy prefix", + legacy: true, + source: UpstreamConfig{ + ListenerJSON: "foo", + ClusterJSON: "bar", + }, + destination: make(map[string]interface{}), + want: map[string]interface{}{ + "envoy_listener_json": "foo", + "envoy_cluster_json": "bar", + }, + }, + { + name: "empty source leaves destination intact", + legacy: true, + source: UpstreamConfig{}, + destination: map[string]interface{}{ + "listener_json": "zip", + "cluster_json": "zap", + "connect_timeout_ms": 10, + "protocol": "grpc", + "limits": UpstreamLimits{ + MaxConnections: intPointer(10), + MaxPendingRequests: intPointer(11), + MaxConcurrentRequests: intPointer(12), + }, + "passive_health_check": PassiveHealthCheck{ + MaxFailures: 13, + Interval: 14 * time.Second, + }, + "mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal}, + }, + want: map[string]interface{}{ + "listener_json": "zip", + "cluster_json": "zap", + "connect_timeout_ms": 10, + "protocol": "grpc", + "limits": UpstreamLimits{ + MaxConnections: intPointer(10), + MaxPendingRequests: intPointer(11), + MaxConcurrentRequests: intPointer(12), + }, + "passive_health_check": PassiveHealthCheck{ + MaxFailures: 13, + Interval: 14 * time.Second, + }, + "mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal}, + }, + }, + { + name: "empty source and destination is a noop", + legacy: true, + source: UpstreamConfig{}, + destination: make(map[string]interface{}), + want: map[string]interface{}{}, + }, + } + for _, tc := range tt { + t.Run(tc.name, func(t *testing.T) { + tc.source.MergeInto(tc.destination, tc.legacy) + assert.Equal(t, tc.want, tc.destination) + }) + } +} + +func TestParseUpstreamConfig(t *testing.T) { + tests := []struct { + name string + input map[string]interface{} + want UpstreamConfig + }{ + { + name: "defaults - nil", + input: nil, + want: UpstreamConfig{ + ConnectTimeoutMs: 5000, + Protocol: "tcp", + }, + }, + { + name: "defaults - empty", + input: map[string]interface{}{}, + want: UpstreamConfig{ + ConnectTimeoutMs: 5000, + Protocol: "tcp", + }, + }, + { + name: "defaults - other stuff", + input: map[string]interface{}{ + "foo": "bar", + "envoy_foo": "envoy_bar", + }, + want: UpstreamConfig{ + ConnectTimeoutMs: 5000, + Protocol: "tcp", + }, + }, + { + name: "protocol override", + input: map[string]interface{}{ + "protocol": "http", + }, + want: UpstreamConfig{ + ConnectTimeoutMs: 5000, + Protocol: "http", + }, + }, + { + name: "connect timeout override, string", + input: map[string]interface{}{ + "connect_timeout_ms": "1000", + }, + want: UpstreamConfig{ + ConnectTimeoutMs: 1000, + Protocol: "tcp", + }, + }, + { + name: "connect timeout override, float ", + input: map[string]interface{}{ + "connect_timeout_ms": float64(1000.0), + }, + want: UpstreamConfig{ + ConnectTimeoutMs: 1000, + Protocol: "tcp", + }, + }, + { + name: "connect timeout override, int ", + input: map[string]interface{}{ + "connect_timeout_ms": 1000, + }, + want: UpstreamConfig{ + ConnectTimeoutMs: 1000, + Protocol: "tcp", + }, + }, + { + name: "connect limits map", + input: map[string]interface{}{ + "limits": map[string]interface{}{ + "max_connections": 50, + "max_pending_requests": 60, + "max_concurrent_requests": 70, + }, + }, + want: UpstreamConfig{ + ConnectTimeoutMs: 5000, + Protocol: "tcp", + Limits: UpstreamLimits{ + MaxConnections: intPointer(50), + MaxPendingRequests: intPointer(60), + MaxConcurrentRequests: intPointer(70), + }, + }, + }, + { + name: "connect limits map zero", + input: map[string]interface{}{ + "limits": map[string]interface{}{ + "max_connections": 0, + "max_pending_requests": 0, + "max_concurrent_requests": 0, + }, + }, + want: UpstreamConfig{ + ConnectTimeoutMs: 5000, + Protocol: "tcp", + Limits: UpstreamLimits{ + MaxConnections: intPointer(0), + MaxPendingRequests: intPointer(0), + MaxConcurrentRequests: intPointer(0), + }, + }, + }, + { + name: "passive health check map", + input: map[string]interface{}{ + "passive_health_check": map[string]interface{}{ + "interval": "22s", + "max_failures": 7, + }, + }, + want: UpstreamConfig{ + ConnectTimeoutMs: 5000, + Protocol: "tcp", + PassiveHealthCheck: PassiveHealthCheck{ + Interval: 22 * time.Second, + MaxFailures: 7, + }, + }, + }, + { + name: "mesh gateway map", + input: map[string]interface{}{ + "mesh_gateway": map[string]interface{}{ + "Mode": "remote", + }, + }, + want: UpstreamConfig{ + ConnectTimeoutMs: 5000, + Protocol: "tcp", + MeshGateway: MeshGatewayConfig{ + Mode: MeshGatewayModeRemote, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseUpstreamConfig(tt.input) + require.NoError(t, err) + require.Equal(t, tt.want, got) + }) + } +} + func requireContainsLower(t *testing.T, haystack, needle string) { t.Helper() require.Contains(t, strings.ToLower(haystack), strings.ToLower(needle)) diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index 0411ab147b..21b3c23240 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -386,7 +386,7 @@ func (s *Server) makeUpstreamClusterForPreparedQuery(upstream structs.Upstream, } sni := connect.UpstreamSNI(&upstream, "", dc, cfgSnap.Roots.TrustDomain) - cfg, err := ParseUpstreamConfig(upstream.Config) + cfg, err := structs.ParseUpstreamConfig(upstream.Config) if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. @@ -448,7 +448,7 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain( return nil, fmt.Errorf("cannot create upstream cluster without discovery chain for %s", upstream.Identifier()) } - cfg, err := ParseUpstreamConfigNoDefaults(upstream.Config) + cfg, err := structs.ParseUpstreamConfigNoDefaults(upstream.Config) if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. diff --git a/agent/xds/config.go b/agent/xds/config.go index dfec6493c2..9f796ab6df 100644 --- a/agent/xds/config.go +++ b/agent/xds/config.go @@ -161,36 +161,3 @@ func ToOutlierDetection(p structs.PassiveHealthCheck) *envoy_cluster_v3.OutlierD } return od } - -func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (structs.UpstreamConfig, error) { - var cfg structs.UpstreamConfig - config := &mapstructure.DecoderConfig{ - DecodeHook: mapstructure.ComposeDecodeHookFunc( - decode.HookWeakDecodeFromSlice, - decode.HookTranslateKeys, - mapstructure.StringToTimeDurationHookFunc(), - ), - Result: &cfg, - WeaklyTypedInput: true, - } - - decoder, err := mapstructure.NewDecoder(config) - if err != nil { - return cfg, err - } - - err = decoder.Decode(m) - return cfg, err -} - -// ParseUpstreamConfig returns the UpstreamConfig parsed from an opaque map. -// If an error occurs during parsing it is returned along with the default -// config this allows caller to choose whether and how to report the error. -func ParseUpstreamConfig(m map[string]interface{}) (structs.UpstreamConfig, error) { - cfg, err := ParseUpstreamConfigNoDefaults(m) - - // Set defaults (even if error is returned) - cfg.Normalize() - - return cfg, err -} diff --git a/agent/xds/config_test.go b/agent/xds/config_test.go index 0265c01518..aa0f8e7239 100644 --- a/agent/xds/config_test.go +++ b/agent/xds/config_test.go @@ -2,11 +2,9 @@ package xds import ( "testing" - "time" - - "github.com/stretchr/testify/require" "github.com/hashicorp/consul/agent/structs" + "github.com/stretchr/testify/require" ) func TestParseProxyConfig(t *testing.T) { @@ -168,144 +166,6 @@ func TestParseProxyConfig(t *testing.T) { } } -func TestParseUpstreamConfig(t *testing.T) { - tests := []struct { - name string - input map[string]interface{} - want structs.UpstreamConfig - }{ - { - name: "defaults - nil", - input: nil, - want: structs.UpstreamConfig{ - ConnectTimeoutMs: 5000, - Protocol: "tcp", - }, - }, - { - name: "defaults - empty", - input: map[string]interface{}{}, - want: structs.UpstreamConfig{ - ConnectTimeoutMs: 5000, - Protocol: "tcp", - }, - }, - { - name: "defaults - other stuff", - input: map[string]interface{}{ - "foo": "bar", - "envoy_foo": "envoy_bar", - }, - want: structs.UpstreamConfig{ - ConnectTimeoutMs: 5000, - Protocol: "tcp", - }, - }, - { - name: "protocol override", - input: map[string]interface{}{ - "protocol": "http", - }, - want: structs.UpstreamConfig{ - ConnectTimeoutMs: 5000, - Protocol: "http", - }, - }, - { - name: "connect timeout override, string", - input: map[string]interface{}{ - "connect_timeout_ms": "1000", - }, - want: structs.UpstreamConfig{ - ConnectTimeoutMs: 1000, - Protocol: "tcp", - }, - }, - { - name: "connect timeout override, float ", - input: map[string]interface{}{ - "connect_timeout_ms": float64(1000.0), - }, - want: structs.UpstreamConfig{ - ConnectTimeoutMs: 1000, - Protocol: "tcp", - }, - }, - { - name: "connect timeout override, int ", - input: map[string]interface{}{ - "connect_timeout_ms": 1000, - }, - want: structs.UpstreamConfig{ - ConnectTimeoutMs: 1000, - Protocol: "tcp", - }, - }, - { - name: "connect limits map", - input: map[string]interface{}{ - "limits": map[string]interface{}{ - "max_connections": 50, - "max_pending_requests": 60, - "max_concurrent_requests": 70, - }, - }, - want: structs.UpstreamConfig{ - ConnectTimeoutMs: 5000, - Protocol: "tcp", - Limits: structs.UpstreamLimits{ - MaxConnections: intPointer(50), - MaxPendingRequests: intPointer(60), - MaxConcurrentRequests: intPointer(70), - }, - }, - }, - { - name: "connect limits map zero", - input: map[string]interface{}{ - "limits": map[string]interface{}{ - "max_connections": 0, - "max_pending_requests": 0, - "max_concurrent_requests": 0, - }, - }, - want: structs.UpstreamConfig{ - ConnectTimeoutMs: 5000, - Protocol: "tcp", - Limits: structs.UpstreamLimits{ - MaxConnections: intPointer(0), - MaxPendingRequests: intPointer(0), - MaxConcurrentRequests: intPointer(0), - }, - }, - }, - { - name: "passive health check map", - input: map[string]interface{}{ - "passive_health_check": map[string]interface{}{ - "interval": "22s", - "max_failures": 7, - }, - }, - want: structs.UpstreamConfig{ - ConnectTimeoutMs: 5000, - Protocol: "tcp", - PassiveHealthCheck: structs.PassiveHealthCheck{ - Interval: 22 * time.Second, - MaxFailures: 7, - }, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, err := ParseUpstreamConfig(tt.input) - require.NoError(t, err) - require.Equal(t, tt.want, got) - }) - } -} - func TestParseGatewayConfig(t *testing.T) { tests := []struct { name string diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index 71ec1e7b5d..64b2d49e2a 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -312,7 +312,7 @@ func (s *Server) endpointsFromDiscoveryChain( return resources } - cfg, err := ParseUpstreamConfigNoDefaults(upstream.Config) + cfg, err := structs.ParseUpstreamConfigNoDefaults(upstream.Config) if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. diff --git a/agent/xds/listeners.go b/agent/xds/listeners.go index 77d33b47d4..dd50128c04 100644 --- a/agent/xds/listeners.go +++ b/agent/xds/listeners.go @@ -1078,7 +1078,7 @@ func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstr ) if chain == nil || chain.IsDefault() { - cfg, err = ParseUpstreamConfig(u.Config) + cfg, err = structs.ParseUpstreamConfig(u.Config) if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue. @@ -1087,7 +1087,7 @@ func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstr } else { // Use NoDefaults here so that we can set the protocol to the chain // protocol if necessary - cfg, err = ParseUpstreamConfigNoDefaults(u.Config) + cfg, err = structs.ParseUpstreamConfigNoDefaults(u.Config) if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns // default config if there is an error so it's safe to continue.