Restore old Envoy prefix on escape hatches

This is done because after removing ID and NodeName from
ServiceConfigRequest we will no longer know whether a request coming in
is for a Consul client earlier than v1.10.
This commit is contained in:
freddygv 2021-03-15 14:12:57 -06:00
parent 93c3c1780d
commit 8b46d8dcbb
9 changed files with 130 additions and 158 deletions

View File

@ -498,7 +498,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
// Merge centralized defaults for all upstreams before configuration for specific upstreams // Merge centralized defaults for all upstreams before configuration for specific upstreams
if upstreamDefaults != nil { if upstreamDefaults != nil {
upstreamDefaults.MergeInto(resolvedCfg, args.ID == "") upstreamDefaults.MergeInto(resolvedCfg)
} }
// The value from the proxy registration overrides the one from upstream_defaults because // The value from the proxy registration overrides the one from upstream_defaults because
// it is specific to the proxy instance // it is specific to the proxy instance
@ -507,7 +507,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
} }
if upstreamConfigs[upstream.String()] != nil { if upstreamConfigs[upstream.String()] != nil {
upstreamConfigs[upstream.String()].MergeInto(resolvedCfg, args.ID == "") upstreamConfigs[upstream.String()].MergeInto(resolvedCfg)
} }
if len(resolvedCfg) > 0 { if len(resolvedCfg) > 0 {

View File

@ -633,20 +633,20 @@ func (r *ServiceConfigRequest) CacheInfo() cache.RequestInfo {
} }
type UpstreamConfig struct { type UpstreamConfig struct {
// ListenerJSON is a complete override ("escape hatch") for the upstream's // EnvoyListenerJSON is a complete override ("escape hatch") for the upstream's
// listener. // listener.
// //
// Note: This escape hatch is NOT compatible with the discovery chain and // Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active. // will be ignored if a discovery chain is active.
ListenerJSON string `json:",omitempty" alias:"listener_json,envoy_listener_json"` EnvoyListenerJSON string `json:",omitempty" alias:"envoy_listener_json"`
// ClusterJSON is a complete override ("escape hatch") for the upstream's // EnvoyClusterJSON is a complete override ("escape hatch") for the upstream's
// cluster. The Connect client TLS certificate and context will be injected // cluster. The Connect client TLS certificate and context will be injected
// overriding any TLS settings present. // overriding any TLS settings present.
// //
// Note: This escape hatch is NOT compatible with the discovery chain and // Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active. // will be ignored if a discovery chain is active.
ClusterJSON string `json:",omitempty" alias:"cluster_json,envoy_cluster_json"` EnvoyClusterJSON string `json:",omitempty" alias:"envoy_cluster_json"`
// Protocol describes the upstream's service protocol. Valid values are "tcp", // Protocol describes the upstream's service protocol. Valid values are "tcp",
// "http" and "grpc". Anything else is treated as tcp. The enables protocol // "http" and "grpc". Anything else is treated as tcp. The enables protocol
@ -670,23 +670,13 @@ type UpstreamConfig struct {
MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" ` MeshGateway MeshGatewayConfig `json:",omitempty" alias:"mesh_gateway" `
} }
func (cfg UpstreamConfig) MergeInto(dst map[string]interface{}, legacy bool) { func (cfg UpstreamConfig) MergeInto(dst map[string]interface{}) {
var (
listenerKey = "listener_json"
clusterKey = "cluster_json"
)
// Starting in Consul 1.10, the "envoy_" prefix was removed from these flags
if legacy {
listenerKey = fmt.Sprintf("envoy_%s", listenerKey)
clusterKey = fmt.Sprintf("envoy_%s", clusterKey)
}
// Avoid storing empty values in the map, since these can act as overrides // Avoid storing empty values in the map, since these can act as overrides
if cfg.ListenerJSON != "" { if cfg.EnvoyListenerJSON != "" {
dst[listenerKey] = cfg.ListenerJSON dst["envoy_listener_json"] = cfg.EnvoyListenerJSON
} }
if cfg.ClusterJSON != "" { if cfg.EnvoyClusterJSON != "" {
dst[clusterKey] = cfg.ClusterJSON dst["envoy_cluster_json"] = cfg.EnvoyClusterJSON
} }
if cfg.Protocol != "" { if cfg.Protocol != "" {
dst["protocol"] = cfg.Protocol dst["protocol"] = cfg.Protocol

View File

@ -131,8 +131,8 @@ func TestDecodeConfigEntry(t *testing.T) {
upstream_defaults { upstream_defaults {
connect_timeout_ms = 5 connect_timeout_ms = 5
protocol = "http" protocol = "http"
listener_json = "foo" envoy_listener_json = "foo"
cluster_json = "bar" envoy_cluster_json = "bar"
limits { limits {
max_connections = 3 max_connections = 3
max_pending_requests = 4 max_pending_requests = 4
@ -169,8 +169,8 @@ func TestDecodeConfigEntry(t *testing.T) {
} }
} }
UpstreamDefaults { UpstreamDefaults {
ListenerJSON = "foo" EnvoyListenerJSON = "foo"
ClusterJSON = "bar" EnvoyClusterJSON = "bar"
ConnectTimeoutMs = 5 ConnectTimeoutMs = 5
Protocol = "http" Protocol = "http"
Limits { Limits {
@ -206,10 +206,10 @@ func TestDecodeConfigEntry(t *testing.T) {
}, },
}, },
UpstreamDefaults: &UpstreamConfig{ UpstreamDefaults: &UpstreamConfig{
ListenerJSON: "foo", EnvoyListenerJSON: "foo",
ClusterJSON: "bar", EnvoyClusterJSON: "bar",
ConnectTimeoutMs: 5, ConnectTimeoutMs: 5,
Protocol: "http", Protocol: "http",
Limits: &UpstreamLimits{ Limits: &UpstreamLimits{
MaxConnections: intPointer(3), MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4), MaxPendingRequests: intPointer(4),
@ -1600,17 +1600,15 @@ func TestUpstreamConfig_MergeInto(t *testing.T) {
name string name string
source UpstreamConfig source UpstreamConfig
destination map[string]interface{} destination map[string]interface{}
legacy bool
want map[string]interface{} want map[string]interface{}
}{ }{
{ {
name: "kitchen sink", name: "kitchen sink",
legacy: false,
source: UpstreamConfig{ source: UpstreamConfig{
ListenerJSON: "foo", EnvoyListenerJSON: "foo",
ClusterJSON: "bar", EnvoyClusterJSON: "bar",
ConnectTimeoutMs: 5, ConnectTimeoutMs: 5,
Protocol: "http", Protocol: "http",
Limits: &UpstreamLimits{ Limits: &UpstreamLimits{
MaxConnections: intPointer(3), MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4), MaxPendingRequests: intPointer(4),
@ -1623,97 +1621,46 @@ func TestUpstreamConfig_MergeInto(t *testing.T) {
MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote}, MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote},
}, },
destination: make(map[string]interface{}), destination: make(map[string]interface{}),
want: map[string]interface{}{
"listener_json": "foo",
"cluster_json": "bar",
"connect_timeout_ms": 5,
"protocol": "http",
"limits": &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
"passive_health_check": &PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
},
{
name: "kitchen sink override of destination",
legacy: false,
source: UpstreamConfig{
ListenerJSON: "foo",
ClusterJSON: "bar",
ConnectTimeoutMs: 5,
Protocol: "http",
Limits: &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
PassiveHealthCheck: &PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
destination: map[string]interface{}{
"listener_json": "zip",
"cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"limits": &UpstreamLimits{
MaxConnections: intPointer(10),
MaxPendingRequests: intPointer(11),
MaxConcurrentRequests: intPointer(12),
},
"passive_health_check": &PassiveHealthCheck{
MaxFailures: 13,
Interval: 14 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal},
},
want: map[string]interface{}{
"listener_json": "foo",
"cluster_json": "bar",
"connect_timeout_ms": 5,
"protocol": "http",
"limits": &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
"passive_health_check": &PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
},
{
name: "legacy flag adds envoy prefix",
legacy: true,
source: UpstreamConfig{
ListenerJSON: "foo",
ClusterJSON: "bar",
},
destination: make(map[string]interface{}),
want: map[string]interface{}{ want: map[string]interface{}{
"envoy_listener_json": "foo", "envoy_listener_json": "foo",
"envoy_cluster_json": "bar", "envoy_cluster_json": "bar",
"connect_timeout_ms": 5,
"protocol": "http",
"limits": &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
"passive_health_check": &PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeRemote},
}, },
}, },
{ {
name: "empty source leaves destination intact", name: "kitchen sink override of destination",
legacy: true, source: UpstreamConfig{
source: UpstreamConfig{}, EnvoyListenerJSON: "foo",
EnvoyClusterJSON: "bar",
ConnectTimeoutMs: 5,
Protocol: "http",
Limits: &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
PassiveHealthCheck: &PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
MeshGateway: MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
destination: map[string]interface{}{ destination: map[string]interface{}{
"listener_json": "zip", "envoy_listener_json": "zip",
"cluster_json": "zap", "envoy_cluster_json": "zap",
"connect_timeout_ms": 10, "connect_timeout_ms": 10,
"protocol": "grpc", "protocol": "grpc",
"limits": &UpstreamLimits{ "limits": &UpstreamLimits{
MaxConnections: intPointer(10), MaxConnections: intPointer(10),
MaxPendingRequests: intPointer(11), MaxPendingRequests: intPointer(11),
@ -1726,10 +1673,46 @@ func TestUpstreamConfig_MergeInto(t *testing.T) {
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal}, "mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal},
}, },
want: map[string]interface{}{ want: map[string]interface{}{
"listener_json": "zip", "envoy_listener_json": "foo",
"cluster_json": "zap", "envoy_cluster_json": "bar",
"connect_timeout_ms": 10, "connect_timeout_ms": 5,
"protocol": "grpc", "protocol": "http",
"limits": &UpstreamLimits{
MaxConnections: intPointer(3),
MaxPendingRequests: intPointer(4),
MaxConcurrentRequests: intPointer(5),
},
"passive_health_check": &PassiveHealthCheck{
MaxFailures: 3,
Interval: 2 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeRemote},
},
},
{
name: "empty source leaves destination intact",
source: UpstreamConfig{},
destination: map[string]interface{}{
"envoy_listener_json": "zip",
"envoy_cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"limits": &UpstreamLimits{
MaxConnections: intPointer(10),
MaxPendingRequests: intPointer(11),
MaxConcurrentRequests: intPointer(12),
},
"passive_health_check": &PassiveHealthCheck{
MaxFailures: 13,
Interval: 14 * time.Second,
},
"mesh_gateway": MeshGatewayConfig{Mode: MeshGatewayModeLocal},
},
want: map[string]interface{}{
"envoy_listener_json": "zip",
"envoy_cluster_json": "zap",
"connect_timeout_ms": 10,
"protocol": "grpc",
"limits": &UpstreamLimits{ "limits": &UpstreamLimits{
MaxConnections: intPointer(10), MaxConnections: intPointer(10),
MaxPendingRequests: intPointer(11), MaxPendingRequests: intPointer(11),
@ -1744,7 +1727,6 @@ func TestUpstreamConfig_MergeInto(t *testing.T) {
}, },
{ {
name: "empty source and destination is a noop", name: "empty source and destination is a noop",
legacy: true,
source: UpstreamConfig{}, source: UpstreamConfig{},
destination: make(map[string]interface{}), destination: make(map[string]interface{}),
want: map[string]interface{}{}, want: map[string]interface{}{},
@ -1752,7 +1734,7 @@ func TestUpstreamConfig_MergeInto(t *testing.T) {
} }
for _, tc := range tt { for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
tc.source.MergeInto(tc.destination, tc.legacy) tc.source.MergeInto(tc.destination)
assert.Equal(t, tc.want, tc.destination) assert.Equal(t, tc.want, tc.destination)
}) })
} }

View File

@ -392,8 +392,8 @@ func (s *Server) makeUpstreamClusterForPreparedQuery(upstream structs.Upstream,
// default config if there is an error so it's safe to continue. // default config if there is an error so it's safe to continue.
s.Logger.Warn("failed to parse", "upstream", upstream.Identifier(), "error", err) s.Logger.Warn("failed to parse", "upstream", upstream.Identifier(), "error", err)
} }
if cfg.ClusterJSON != "" { if cfg.EnvoyClusterJSON != "" {
c, err = makeClusterFromUserConfig(cfg.ClusterJSON) c, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON)
if err != nil { if err != nil {
return c, err return c, err
} }
@ -457,11 +457,11 @@ func (s *Server) makeUpstreamClustersForDiscoveryChain(
} }
var escapeHatchCluster *envoy_cluster_v3.Cluster var escapeHatchCluster *envoy_cluster_v3.Cluster
if cfg.ClusterJSON != "" { if cfg.EnvoyClusterJSON != "" {
if chain.IsDefault() { if chain.IsDefault() {
// If you haven't done anything to setup the discovery chain, then // If you haven't done anything to setup the discovery chain, then
// you can use the envoy_cluster_json escape hatch. // you can use the envoy_cluster_json escape hatch.
escapeHatchCluster, err = makeClusterFromUserConfig(cfg.ClusterJSON) escapeHatchCluster, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -321,11 +321,11 @@ func (s *Server) endpointsFromDiscoveryChain(
} }
var escapeHatchCluster *envoy_cluster_v3.Cluster var escapeHatchCluster *envoy_cluster_v3.Cluster
if cfg.ClusterJSON != "" { if cfg.EnvoyClusterJSON != "" {
if chain.IsDefault() { if chain.IsDefault() {
// If you haven't done anything to setup the discovery chain, then // If you haven't done anything to setup the discovery chain, then
// you can use the envoy_cluster_json escape hatch. // you can use the envoy_cluster_json escape hatch.
escapeHatchCluster, err = makeClusterFromUserConfig(cfg.ClusterJSON) escapeHatchCluster, err = makeClusterFromUserConfig(cfg.EnvoyClusterJSON)
if err != nil { if err != nil {
return resources return resources
} }

View File

@ -987,8 +987,8 @@ func (s *Server) makeUpstreamListenerForDiscoveryChain(
l := makeListener(upstreamID, address, u.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND) l := makeListener(upstreamID, address, u.LocalBindPort, envoy_core_v3.TrafficDirection_OUTBOUND)
cfg := getAndModifyUpstreamConfigForListener(s.Logger, u, chain) cfg := getAndModifyUpstreamConfigForListener(s.Logger, u, chain)
if cfg.ListenerJSON != "" { if cfg.EnvoyListenerJSON != "" {
return makeListenerFromUserConfig(cfg.ListenerJSON) return makeListenerFromUserConfig(cfg.EnvoyListenerJSON)
} }
useRDS := true useRDS := true
@ -1094,12 +1094,12 @@ func getAndModifyUpstreamConfigForListener(logger hclog.Logger, u *structs.Upstr
logger.Warn("failed to parse", "upstream", u.Identifier(), "error", err) logger.Warn("failed to parse", "upstream", u.Identifier(), "error", err)
} }
if cfg.ListenerJSON != "" { if cfg.EnvoyListenerJSON != "" {
logger.Warn("ignoring escape hatch setting because already configured for", logger.Warn("ignoring escape hatch setting because already configured for",
"discovery chain", chain.ServiceName, "upstream", u.Identifier(), "config", "envoy_listener_json") "discovery chain", chain.ServiceName, "upstream", u.Identifier(), "config", "envoy_listener_json")
// Remove from config struct so we don't use it later on // Remove from config struct so we don't use it later on
cfg.ListenerJSON = "" cfg.EnvoyListenerJSON = ""
} }
proto := cfg.Protocol proto := cfg.Protocol

View File

@ -100,20 +100,20 @@ type ConnectConfiguration struct {
} }
type UpstreamConfig struct { type UpstreamConfig struct {
// ListenerJSON is a complete override ("escape hatch") for the upstream's // EnvoyListenerJSON is a complete override ("escape hatch") for the upstream's
// listener. // listener.
// //
// Note: This escape hatch is NOT compatible with the discovery chain and // Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active. // will be ignored if a discovery chain is active.
ListenerJSON string `json:",omitempty" alias:"listener_json"` EnvoyListenerJSON string `json:",omitempty" alias:"envoy_listener_json"`
// ClusterJSON is a complete override ("escape hatch") for the upstream's // EnvoyClusterJSON is a complete override ("escape hatch") for the upstream's
// cluster. The Connect client TLS certificate and context will be injected // cluster. The Connect client TLS certificate and context will be injected
// overriding any TLS settings present. // overriding any TLS settings present.
// //
// Note: This escape hatch is NOT compatible with the discovery chain and // Note: This escape hatch is NOT compatible with the discovery chain and
// will be ignored if a discovery chain is active. // will be ignored if a discovery chain is active.
ClusterJSON string `json:",omitempty" alias:"cluster_json"` EnvoyClusterJSON string `json:",omitempty" alias:"envoy_cluster_json"`
// Protocol describes the upstream's service protocol. Valid values are "tcp", // Protocol describes the upstream's service protocol. Valid values are "tcp",
// "http" and "grpc". Anything else is treated as tcp. The enables protocol // "http" and "grpc". Anything else is treated as tcp. The enables protocol

View File

@ -348,8 +348,8 @@ func TestDecodeConfigEntry(t *testing.T) {
} }
}, },
"UpstreamDefaults": { "UpstreamDefaults": {
"ClusterJSON": "zip", "EnvoyClusterJSON": "zip",
"ListenerJSON": "zop", "EnvoyListenerJSON": "zop",
"ConnectTimeoutMs": 5000, "ConnectTimeoutMs": 5000,
"Protocol": "http", "Protocol": "http",
"Limits": { "Limits": {
@ -390,10 +390,10 @@ func TestDecodeConfigEntry(t *testing.T) {
}, },
}, },
UpstreamDefaults: UpstreamConfig{ UpstreamDefaults: UpstreamConfig{
ClusterJSON: "zip", EnvoyClusterJSON: "zip",
ListenerJSON: "zop", EnvoyListenerJSON: "zop",
Protocol: "http", Protocol: "http",
ConnectTimeoutMs: 5000, ConnectTimeoutMs: 5000,
Limits: &UpstreamLimits{ Limits: &UpstreamLimits{
MaxConnections: 3, MaxConnections: 3,
MaxPendingRequests: 4, MaxPendingRequests: 4,

View File

@ -451,8 +451,8 @@ func TestParseConfigEntry(t *testing.T) {
} }
} }
upstream_defaults { upstream_defaults {
cluster_json = "zip" envoy_cluster_json = "zip"
listener_json = "zop" envoy_listener_json = "zop"
connect_timeout_ms = 5000 connect_timeout_ms = 5000
protocol = "http" protocol = "http"
limits { limits {
@ -494,8 +494,8 @@ func TestParseConfigEntry(t *testing.T) {
} }
} }
upstream_defaults = { upstream_defaults = {
cluster_json = "zip" envoy_cluster_json = "zip"
listener_json = "zop" envoy_listener_json = "zop"
connect_timeout_ms = 5000 connect_timeout_ms = 5000
protocol = "http" protocol = "http"
limits = { limits = {
@ -538,8 +538,8 @@ func TestParseConfigEntry(t *testing.T) {
} }
}, },
"upstream_defaults": { "upstream_defaults": {
"cluster_json": "zip", "envoy_cluster_json": "zip",
"listener_json": "zop", "envoy_listener_json": "zop",
"connect_timeout_ms": 5000, "connect_timeout_ms": 5000,
"protocol": "http", "protocol": "http",
"limits": { "limits": {
@ -583,8 +583,8 @@ func TestParseConfigEntry(t *testing.T) {
} }
}, },
"UpstreamDefaults": { "UpstreamDefaults": {
"ClusterJSON": "zip", "EnvoyClusterJSON": "zip",
"ListenerJSON": "zop", "EnvoyListenerJSON": "zop",
"ConnectTimeoutMs": 5000, "ConnectTimeoutMs": 5000,
"Protocol": "http", "Protocol": "http",
"Limits": { "Limits": {
@ -627,10 +627,10 @@ func TestParseConfigEntry(t *testing.T) {
}, },
}, },
UpstreamDefaults: api.UpstreamConfig{ UpstreamDefaults: api.UpstreamConfig{
ClusterJSON: "zip", EnvoyClusterJSON: "zip",
ListenerJSON: "zop", EnvoyListenerJSON: "zop",
Protocol: "http", Protocol: "http",
ConnectTimeoutMs: 5000, ConnectTimeoutMs: 5000,
Limits: &api.UpstreamLimits{ Limits: &api.UpstreamLimits{
MaxConnections: 3, MaxConnections: 3,
MaxPendingRequests: 4, MaxPendingRequests: 4,