mirror of https://github.com/status-im/consul.git
finish moving UpstreamConfig and related fields to structs pkg
This commit is contained in:
parent
87cde19b4c
commit
1710ec87d2
|
@ -136,8 +136,11 @@ func (e *ServiceConfigEntry) Normalize() error {
|
|||
func (e *ServiceConfigEntry) Validate() error {
|
||||
validationErr := validateConfigEntryMeta(e.Meta)
|
||||
|
||||
if err := e.Connect.Validate(); err != nil {
|
||||
validationErr = multierror.Append(validationErr, err)
|
||||
if e.Connect != nil {
|
||||
err := e.Connect.Validate()
|
||||
if err != nil {
|
||||
validationErr = multierror.Append(validationErr, err)
|
||||
}
|
||||
}
|
||||
|
||||
return validationErr
|
||||
|
@ -663,6 +666,41 @@ type UpstreamConfig struct {
|
|||
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" `
|
||||
}
|
||||
|
||||
func (cfg UpstreamConfig) MergeInto(dst map[string]interface{}, legacy bool) {
|
||||
var (
|
||||
listenerKey = "listener_json"
|
||||
clusterKey = "cluster_json"
|
||||
)
|
||||
// Starting in Consul 1.10, the "envoy_" prefix was removed from these flags
|
||||
if legacy {
|
||||
listenerKey = fmt.Sprintf("envoy_%s", listenerKey)
|
||||
clusterKey = fmt.Sprintf("envoy_%s", clusterKey)
|
||||
}
|
||||
|
||||
// Avoid storing empty values in the map, since these can act as overrides
|
||||
if cfg.ListenerJSON != "" {
|
||||
dst[listenerKey] = cfg.ListenerJSON
|
||||
}
|
||||
if cfg.ClusterJSON != "" {
|
||||
dst[clusterKey] = cfg.ClusterJSON
|
||||
}
|
||||
if cfg.Protocol != "" {
|
||||
dst["protocol"] = cfg.Protocol
|
||||
}
|
||||
if cfg.ConnectTimeoutMs != 0 {
|
||||
dst["connect_timeout_ms"] = cfg.ConnectTimeoutMs
|
||||
}
|
||||
if !cfg.MeshGateway.IsZero() {
|
||||
dst["mesh_gateway"] = cfg.MeshGateway
|
||||
}
|
||||
if !cfg.Limits.IsZero() {
|
||||
dst["limits"] = cfg.Limits
|
||||
}
|
||||
if !cfg.PassiveHealthCheck.IsZero() {
|
||||
dst["passive_health_check"] = cfg.PassiveHealthCheck
|
||||
}
|
||||
}
|
||||
|
||||
func (cfg *UpstreamConfig) Normalize() {
|
||||
if cfg.Protocol == "" {
|
||||
cfg.Protocol = "tcp"
|
||||
|
@ -688,6 +726,39 @@ func (cfg UpstreamConfig) Validate() error {
|
|||
return validationErr
|
||||
}
|
||||
|
||||
func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (UpstreamConfig, error) {
|
||||
var cfg UpstreamConfig
|
||||
config := &mapstructure.DecoderConfig{
|
||||
DecodeHook: mapstructure.ComposeDecodeHookFunc(
|
||||
decode.HookWeakDecodeFromSlice,
|
||||
decode.HookTranslateKeys,
|
||||
mapstructure.StringToTimeDurationHookFunc(),
|
||||
),
|
||||
Result: &cfg,
|
||||
WeaklyTypedInput: true,
|
||||
}
|
||||
|
||||
decoder, err := mapstructure.NewDecoder(config)
|
||||
if err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
|
||||
err = decoder.Decode(m)
|
||||
return cfg, err
|
||||
}
|
||||
|
||||
// ParseUpstreamConfig returns the UpstreamConfig parsed from an opaque map.
|
||||
// If an error occurs during parsing it is returned along with the default
|
||||
// config this allows caller to choose whether and how to report the error.
|
||||
func ParseUpstreamConfig(m map[string]interface{}) (UpstreamConfig, error) {
|
||||
cfg, err := ParseUpstreamConfigNoDefaults(m)
|
||||
|
||||
// Set defaults (even if error is returned)
|
||||
cfg.Normalize()
|
||||
|
||||
return cfg, err
|
||||
}
|
||||
|
||||
type PassiveHealthCheck struct {
|
||||
// Interval between health check analysis sweeps. Each sweep may remove
|
||||
// hosts or return hosts to the pool.
|
||||
|
@ -698,6 +769,11 @@ type PassiveHealthCheck struct {
|
|||
MaxFailures uint32 `alias:"max_failures"`
|
||||
}
|
||||
|
||||
func (chk *PassiveHealthCheck) IsZero() bool {
|
||||
zeroVal := PassiveHealthCheck{}
|
||||
return *chk == zeroVal
|
||||
}
|
||||
|
||||
func (chk PassiveHealthCheck) Validate() error {
|
||||
if chk.Interval <= 0*time.Second {
|
||||
return fmt.Errorf("passive health check interval must be greater than 0s")
|
||||
|
@ -724,6 +800,11 @@ type UpstreamLimits struct {
|
|||
MaxConcurrentRequests *int `alias:"max_concurrent_requests"`
|
||||
}
|
||||
|
||||
func (ul *UpstreamLimits) IsZero() bool {
|
||||
zeroVal := UpstreamLimits{}
|
||||
return *ul == zeroVal
|
||||
}
|
||||
|
||||
func (ul UpstreamLimits) Validate() error {
|
||||
if ul.MaxConnections != nil && *ul.MaxConnections <= 0 {
|
||||
return fmt.Errorf("max connections must be at least 0")
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/go-msgpack/codec"
|
||||
"github.com/hashicorp/hcl"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -1596,6 +1597,322 @@ func TestServiceConfigEntry_Normalize(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestUpstreamConfig_MergeInto(t *testing.T) {
|
||||
tt := []struct {
|
||||
name string
|
||||
source UpstreamConfig
|
||||
destination map[string]interface{}
|
||||
legacy bool
|
||||
want map[string]interface{}
|
||||
}{
|
||||
{
|
||||
name: "kitchen sink",
|
||||
legacy: false,
|
||||
source: UpstreamConfig{
|
||||
ListenerJSON: "foo",
|
||||
ClusterJSON: "bar",
|
||||
ConnectTimeoutMs: 5,
|
||||
Protocol: "http",
|
||||
Limits: UpstreamLimits{
|
||||
MaxConnections: intPointer(3),
|
||||
MaxPendingRequests: intPointer(4),
|
||||
MaxConcurrentRequests: intPointer(5),
|
||||
},
|
||||
PassiveHealthCheck: PassiveHealthCheck{
|
||||
MaxFailures: 3,
|
||||
Interval: 2 * time.Second,
|
||||
},
|
||||
MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote},
|
||||
},
|
||||
destination: make(map[string]interface{}),
|
||||
want: map[string]interface{}{
|
||||
"listener_json": "foo",
|
||||
"cluster_json": "bar",
|
||||
"connect_timeout_ms": 5,
|
||||
"protocol": "http",
|
||||
"limits": UpstreamLimits{
|
||||
MaxConnections: intPointer(3),
|
||||
MaxPendingRequests: intPointer(4),
|
||||
MaxConcurrentRequests: intPointer(5),
|
||||
},
|
||||
"passive_health_check": PassiveHealthCheck{
|
||||
MaxFailures: 3,
|
||||
Interval: 2 * time.Second,
|
||||
},
|
||||
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeRemote},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "kitchen sink override of destination",
|
||||
legacy: false,
|
||||
source: UpstreamConfig{
|
||||
ListenerJSON: "foo",
|
||||
ClusterJSON: "bar",
|
||||
ConnectTimeoutMs: 5,
|
||||
Protocol: "http",
|
||||
Limits: UpstreamLimits{
|
||||
MaxConnections: intPointer(3),
|
||||
MaxPendingRequests: intPointer(4),
|
||||
MaxConcurrentRequests: intPointer(5),
|
||||
},
|
||||
PassiveHealthCheck: PassiveHealthCheck{
|
||||
MaxFailures: 3,
|
||||
Interval: 2 * time.Second,
|
||||
},
|
||||
MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote},
|
||||
},
|
||||
destination: map[string]interface{}{
|
||||
"listener_json": "zip",
|
||||
"cluster_json": "zap",
|
||||
"connect_timeout_ms": 10,
|
||||
"protocol": "grpc",
|
||||
"limits": UpstreamLimits{
|
||||
MaxConnections: intPointer(10),
|
||||
MaxPendingRequests: intPointer(11),
|
||||
MaxConcurrentRequests: intPointer(12),
|
||||
},
|
||||
"passive_health_check": PassiveHealthCheck{
|
||||
MaxFailures: 13,
|
||||
Interval: 14 * time.Second,
|
||||
},
|
||||
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal},
|
||||
},
|
||||
want: map[string]interface{}{
|
||||
"listener_json": "foo",
|
||||
"cluster_json": "bar",
|
||||
"connect_timeout_ms": 5,
|
||||
"protocol": "http",
|
||||
"limits": UpstreamLimits{
|
||||
MaxConnections: intPointer(3),
|
||||
MaxPendingRequests: intPointer(4),
|
||||
MaxConcurrentRequests: intPointer(5),
|
||||
},
|
||||
"passive_health_check": PassiveHealthCheck{
|
||||
MaxFailures: 3,
|
||||
Interval: 2 * time.Second,
|
||||
},
|
||||
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeRemote},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "legacy flag adds envoy prefix",
|
||||
legacy: true,
|
||||
source: UpstreamConfig{
|
||||
ListenerJSON: "foo",
|
||||
ClusterJSON: "bar",
|
||||
},
|
||||
destination: make(map[string]interface{}),
|
||||
want: map[string]interface{}{
|
||||
"envoy_listener_json": "foo",
|
||||
"envoy_cluster_json": "bar",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty source leaves destination intact",
|
||||
legacy: true,
|
||||
source: UpstreamConfig{},
|
||||
destination: map[string]interface{}{
|
||||
"listener_json": "zip",
|
||||
"cluster_json": "zap",
|
||||
"connect_timeout_ms": 10,
|
||||
"protocol": "grpc",
|
||||
"limits": UpstreamLimits{
|
||||
MaxConnections: intPointer(10),
|
||||
MaxPendingRequests: intPointer(11),
|
||||
MaxConcurrentRequests: intPointer(12),
|
||||
},
|
||||
"passive_health_check": PassiveHealthCheck{
|
||||
MaxFailures: 13,
|
||||
Interval: 14 * time.Second,
|
||||
},
|
||||
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal},
|
||||
},
|
||||
want: map[string]interface{}{
|
||||
"listener_json": "zip",
|
||||
"cluster_json": "zap",
|
||||
"connect_timeout_ms": 10,
|
||||
"protocol": "grpc",
|
||||
"limits": UpstreamLimits{
|
||||
MaxConnections: intPointer(10),
|
||||
MaxPendingRequests: intPointer(11),
|
||||
MaxConcurrentRequests: intPointer(12),
|
||||
},
|
||||
"passive_health_check": PassiveHealthCheck{
|
||||
MaxFailures: 13,
|
||||
Interval: 14 * time.Second,
|
||||
},
|
||||
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "empty source and destination is a noop",
|
||||
legacy: true,
|
||||
source: UpstreamConfig{},
|
||||
destination: make(map[string]interface{}),
|
||||
want: map[string]interface{}{},
|
||||
},
|
||||
}
|
||||
for _, tc := range tt {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
tc.source.MergeInto(tc.destination, tc.legacy)
|
||||
assert.Equal(t, tc.want, tc.destination)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseUpstreamConfig(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input map[string]interface{}
|
||||
want UpstreamConfig
|
||||
}{
|
||||
{
|
||||
name: "defaults - nil",
|
||||
input: nil,
|
||||
want: UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "defaults - empty",
|
||||
input: map[string]interface{}{},
|
||||
want: UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "defaults - other stuff",
|
||||
input: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
"envoy_foo": "envoy_bar",
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "protocol override",
|
||||
input: map[string]interface{}{
|
||||
"protocol": "http",
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "http",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "connect timeout override, string",
|
||||
input: map[string]interface{}{
|
||||
"connect_timeout_ms": "1000",
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
ConnectTimeoutMs: 1000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "connect timeout override, float ",
|
||||
input: map[string]interface{}{
|
||||
"connect_timeout_ms": float64(1000.0),
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
ConnectTimeoutMs: 1000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "connect timeout override, int ",
|
||||
input: map[string]interface{}{
|
||||
"connect_timeout_ms": 1000,
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
ConnectTimeoutMs: 1000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "connect limits map",
|
||||
input: map[string]interface{}{
|
||||
"limits": map[string]interface{}{
|
||||
"max_connections": 50,
|
||||
"max_pending_requests": 60,
|
||||
"max_concurrent_requests": 70,
|
||||
},
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
Limits: UpstreamLimits{
|
||||
MaxConnections: intPointer(50),
|
||||
MaxPendingRequests: intPointer(60),
|
||||
MaxConcurrentRequests: intPointer(70),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "connect limits map zero",
|
||||
input: map[string]interface{}{
|
||||
"limits": map[string]interface{}{
|
||||
"max_connections": 0,
|
||||
"max_pending_requests": 0,
|
||||
"max_concurrent_requests": 0,
|
||||
},
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
Limits: UpstreamLimits{
|
||||
MaxConnections: intPointer(0),
|
||||
MaxPendingRequests: intPointer(0),
|
||||
MaxConcurrentRequests: intPointer(0),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "passive health check map",
|
||||
input: map[string]interface{}{
|
||||
"passive_health_check": map[string]interface{}{
|
||||
"interval": "22s",
|
||||
"max_failures": 7,
|
||||
},
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
PassiveHealthCheck: PassiveHealthCheck{
|
||||
Interval: 22 * time.Second,
|
||||
MaxFailures: 7,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "mesh gateway map",
|
||||
input: map[string]interface{}{
|
||||
"mesh_gateway": map[string]interface{}{
|
||||
"Mode": "remote",
|
||||
},
|
||||
},
|
||||
want: UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
MeshGateway: MeshGatewayConfig{
|
||||
Mode: MeshGatewayModeRemote,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := ParseUpstreamConfig(tt.input)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func requireContainsLower(t *testing.T, haystack, needle string) {
|
||||
t.Helper()
|
||||
require.Contains(t, strings.ToLower(haystack), strings.ToLower(needle))
|
||||
|
|
|
@ -386,7 +386,7 @@ func (s *Server) makeUpstreamClusterForPreparedQuery(upstream structs.Upstream,
|
|||
}
|
||||
sni := connect.UpstreamSNI(&upstream, "", dc, cfgSnap.Roots.TrustDomain)
|
||||
|
||||
cfg, err := ParseUpstreamConfig(upstream.Config)
|
||||
cfg, err := structs.ParseUpstreamConfig(upstream.Config)
|
||||
if err != nil {
|
||||
// Don't hard fail on a config typo, just warn. The parse func returns
|
||||
// default config if there is an error so it's safe to continue.
|
||||
|
@ -448,7 +448,7 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
|
|||
return nil, fmt.Errorf("cannot create upstream cluster without discovery chain for %s", upstream.Identifier())
|
||||
}
|
||||
|
||||
cfg, err := ParseUpstreamConfigNoDefaults(upstream.Config)
|
||||
cfg, err := structs.ParseUpstreamConfigNoDefaults(upstream.Config)
|
||||
if err != nil {
|
||||
// Don't hard fail on a config typo, just warn. The parse func returns
|
||||
// default config if there is an error so it's safe to continue.
|
||||
|
|
|
@ -161,36 +161,3 @@ func ToOutlierDetection(p structs.PassiveHealthCheck) *envoy_cluster_v3.OutlierD
|
|||
}
|
||||
return od
|
||||
}
|
||||
|
||||
func ParseUpstreamConfigNoDefaults(m map[string]interface{}) (structs.UpstreamConfig, error) {
|
||||
var cfg structs.UpstreamConfig
|
||||
config := &mapstructure.DecoderConfig{
|
||||
DecodeHook: mapstructure.ComposeDecodeHookFunc(
|
||||
decode.HookWeakDecodeFromSlice,
|
||||
decode.HookTranslateKeys,
|
||||
mapstructure.StringToTimeDurationHookFunc(),
|
||||
),
|
||||
Result: &cfg,
|
||||
WeaklyTypedInput: true,
|
||||
}
|
||||
|
||||
decoder, err := mapstructure.NewDecoder(config)
|
||||
if err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
|
||||
err = decoder.Decode(m)
|
||||
return cfg, err
|
||||
}
|
||||
|
||||
// ParseUpstreamConfig returns the UpstreamConfig parsed from an opaque map.
|
||||
// If an error occurs during parsing it is returned along with the default
|
||||
// config this allows caller to choose whether and how to report the error.
|
||||
func ParseUpstreamConfig(m map[string]interface{}) (structs.UpstreamConfig, error) {
|
||||
cfg, err := ParseUpstreamConfigNoDefaults(m)
|
||||
|
||||
// Set defaults (even if error is returned)
|
||||
cfg.Normalize()
|
||||
|
||||
return cfg, err
|
||||
}
|
||||
|
|
|
@ -2,11 +2,9 @@ package xds
|
|||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestParseProxyConfig(t *testing.T) {
|
||||
|
@ -168,144 +166,6 @@ func TestParseProxyConfig(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestParseUpstreamConfig(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
input map[string]interface{}
|
||||
want structs.UpstreamConfig
|
||||
}{
|
||||
{
|
||||
name: "defaults - nil",
|
||||
input: nil,
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "defaults - empty",
|
||||
input: map[string]interface{}{},
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "defaults - other stuff",
|
||||
input: map[string]interface{}{
|
||||
"foo": "bar",
|
||||
"envoy_foo": "envoy_bar",
|
||||
},
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "protocol override",
|
||||
input: map[string]interface{}{
|
||||
"protocol": "http",
|
||||
},
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "http",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "connect timeout override, string",
|
||||
input: map[string]interface{}{
|
||||
"connect_timeout_ms": "1000",
|
||||
},
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 1000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "connect timeout override, float ",
|
||||
input: map[string]interface{}{
|
||||
"connect_timeout_ms": float64(1000.0),
|
||||
},
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 1000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "connect timeout override, int ",
|
||||
input: map[string]interface{}{
|
||||
"connect_timeout_ms": 1000,
|
||||
},
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 1000,
|
||||
Protocol: "tcp",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "connect limits map",
|
||||
input: map[string]interface{}{
|
||||
"limits": map[string]interface{}{
|
||||
"max_connections": 50,
|
||||
"max_pending_requests": 60,
|
||||
"max_concurrent_requests": 70,
|
||||
},
|
||||
},
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
Limits: structs.UpstreamLimits{
|
||||
MaxConnections: intPointer(50),
|
||||
MaxPendingRequests: intPointer(60),
|
||||
MaxConcurrentRequests: intPointer(70),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "connect limits map zero",
|
||||
input: map[string]interface{}{
|
||||
"limits": map[string]interface{}{
|
||||
"max_connections": 0,
|
||||
"max_pending_requests": 0,
|
||||
"max_concurrent_requests": 0,
|
||||
},
|
||||
},
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
Limits: structs.UpstreamLimits{
|
||||
MaxConnections: intPointer(0),
|
||||
MaxPendingRequests: intPointer(0),
|
||||
MaxConcurrentRequests: intPointer(0),
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "passive health check map",
|
||||
input: map[string]interface{}{
|
||||
"passive_health_check": map[string]interface{}{
|
||||
"interval": "22s",
|
||||
"max_failures": 7,
|
||||
},
|
||||
},
|
||||
want: structs.UpstreamConfig{
|
||||
ConnectTimeoutMs: 5000,
|
||||
Protocol: "tcp",
|
||||
PassiveHealthCheck: structs.PassiveHealthCheck{
|
||||
Interval: 22 * time.Second,
|
||||
MaxFailures: 7,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
got, err := ParseUpstreamConfig(tt.input)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tt.want, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseGatewayConfig(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
|
|
@ -312,7 +312,7 @@ func (s *Server) endpointsFromDiscoveryChain(
|
|||
return resources
|
||||
}
|
||||
|
||||
cfg, err := ParseUpstreamConfigNoDefaults(upstream.Config)
|
||||
cfg, err := structs.ParseUpstreamConfigNoDefaults(upstream.Config)
|
||||
if err != nil {
|
||||
// Don't hard fail on a config typo, just warn. The parse func returns
|
||||
// default config if there is an error so it's safe to continue.
|
||||
|
|
|
@ -1078,7 +1078,7 @@ func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstr
|
|||
)
|
||||
|
||||
if chain == nil || chain.IsDefault() {
|
||||
cfg, err = ParseUpstreamConfig(u.Config)
|
||||
cfg, err = structs.ParseUpstreamConfig(u.Config)
|
||||
if err != nil {
|
||||
// Don't hard fail on a config typo, just warn. The parse func returns
|
||||
// default config if there is an error so it's safe to continue.
|
||||
|
@ -1087,7 +1087,7 @@ func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstr
|
|||
} else {
|
||||
// Use NoDefaults here so that we can set the protocol to the chain
|
||||
// protocol if necessary
|
||||
cfg, err = ParseUpstreamConfigNoDefaults(u.Config)
|
||||
cfg, err = structs.ParseUpstreamConfigNoDefaults(u.Config)
|
||||
if err != nil {
|
||||
// Don't hard fail on a config typo, just warn. The parse func returns
|
||||
// default config if there is an error so it's safe to continue.
|
||||
|
|
Loading…
Reference in New Issue