mirror of https://github.com/status-im/consul.git
[NET-4703] Prevent partial application of Envoy extensions (#18068)
Prevent partial application of Envoy extensions Ensure that non-required extensions do not change xDS resources before exiting on failure by cloning proto messages prior to applying each extension. To support this change, also move `CanApply` checks up a layer and make them prior to attempting extension application, s.t. we avoid unnecessary copies where extensions can't be applied. Last, ensure that we do not allow panics from `CanApply` or `Extend` checks to escape the attempted extension application.
This commit is contained in:
parent
18a5edd232
commit
b1b05f0bac
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
xds: Prevent partial application of non-Required Envoy extensions in the case of failure.
|
||||
```
|
|
@ -258,7 +258,7 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
|||
s.ResourceMapMutateFn(newResourceMap)
|
||||
}
|
||||
|
||||
if err = s.applyEnvoyExtensions(newResourceMap, cfgSnap, node); err != nil {
|
||||
if newResourceMap, err = s.applyEnvoyExtensions(newResourceMap, cfgSnap, node); err != nil {
|
||||
// err is already the result of calling status.Errorf
|
||||
return err
|
||||
}
|
||||
|
@ -403,30 +403,30 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, cfgSnap *proxycfg.ConfigSnapshot, node *envoy_config_core_v3.Node) error {
|
||||
func (s *Server) applyEnvoyExtensions(resources *xdscommon.IndexedResources, cfgSnap *proxycfg.ConfigSnapshot, node *envoy_config_core_v3.Node) (*xdscommon.IndexedResources, error) {
|
||||
var err error
|
||||
envoyVersion := xdscommon.DetermineEnvoyVersionFromNode(node)
|
||||
consulVersion, err := goversion.NewVersion(version.Version)
|
||||
|
||||
if err != nil {
|
||||
return status.Errorf(codes.InvalidArgument, "failed to parse Consul version")
|
||||
return nil, status.Errorf(codes.InvalidArgument, "failed to parse Consul version")
|
||||
}
|
||||
|
||||
serviceConfigs := extensionruntime.GetRuntimeConfigurations(cfgSnap)
|
||||
for _, cfgs := range serviceConfigs {
|
||||
for _, cfg := range cfgs {
|
||||
err = applyEnvoyExtension(s.Logger, cfgSnap, resources, cfg, envoyVersion, consulVersion)
|
||||
resources, err = validateAndApplyEnvoyExtension(s.Logger, cfgSnap, resources, cfg, envoyVersion, consulVersion)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
func applyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot, resources *xdscommon.IndexedResources, runtimeConfig extensioncommon.RuntimeConfig, envoyVersion, consulVersion *goversion.Version) error {
|
||||
func validateAndApplyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot, resources *xdscommon.IndexedResources, runtimeConfig extensioncommon.RuntimeConfig, envoyVersion, consulVersion *goversion.Version) (*xdscommon.IndexedResources, error) {
|
||||
logFn := logger.Warn
|
||||
if runtimeConfig.EnvoyExtension.Required {
|
||||
logFn = logger.Error
|
||||
|
@ -460,14 +460,14 @@ func applyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot,
|
|||
logFn("failed to parse Envoy extension version constraint", errorParams...)
|
||||
|
||||
if ext.Required {
|
||||
return status.Errorf(codes.InvalidArgument, "failed to parse Envoy version constraint for extension %q for service %q", ext.Name, svc.Name)
|
||||
return nil, status.Errorf(codes.InvalidArgument, "failed to parse Envoy version constraint for extension %q for service %q", ext.Name, svc.Name)
|
||||
}
|
||||
return nil
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
if !c.Check(envoyVersion) {
|
||||
logger.Info("skipping envoy extension due to Envoy version constraint violation", errorParams...)
|
||||
return nil
|
||||
return resources, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -477,14 +477,14 @@ func applyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot,
|
|||
logFn("failed to parse Consul extension version constraint", errorParams...)
|
||||
|
||||
if ext.Required {
|
||||
return status.Errorf(codes.InvalidArgument, "failed to parse Consul version constraint for extension %q for service %q", ext.Name, svc.Name)
|
||||
return nil, status.Errorf(codes.InvalidArgument, "failed to parse Consul version constraint for extension %q for service %q", ext.Name, svc.Name)
|
||||
}
|
||||
return nil
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
if !c.Check(consulVersion) {
|
||||
logger.Info("skipping envoy extension due to Consul version constraint violation", errorParams...)
|
||||
return nil
|
||||
return resources, nil
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -496,10 +496,10 @@ func applyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot,
|
|||
logFn("failed to construct extension", errorParams...)
|
||||
|
||||
if ext.Required {
|
||||
return status.Errorf(codes.InvalidArgument, "failed to construct extension %q for service %q", ext.Name, svc.Name)
|
||||
return nil, status.Errorf(codes.InvalidArgument, "failed to construct extension %q for service %q", ext.Name, svc.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
now = time.Now()
|
||||
|
@ -510,25 +510,59 @@ func applyEnvoyExtension(logger hclog.Logger, cfgSnap *proxycfg.ConfigSnapshot,
|
|||
logFn("failed to validate extension arguments", errorParams...)
|
||||
|
||||
if ext.Required {
|
||||
return status.Errorf(codes.InvalidArgument, "failed to validate arguments for extension %q for service %q", ext.Name, svc.Name)
|
||||
return nil, status.Errorf(codes.InvalidArgument, "failed to validate arguments for extension %q for service %q", ext.Name, svc.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
now = time.Now()
|
||||
_, err = extender.Extend(resources, &runtimeConfig)
|
||||
resources, err = applyEnvoyExtension(extender, resources, &runtimeConfig)
|
||||
metrics.MeasureSinceWithLabels([]string{"envoy_extension", "extend"}, now, getMetricLabels(err))
|
||||
if err != nil {
|
||||
errorParams = append(errorParams, "error", err)
|
||||
logFn("failed to apply envoy extension", errorParams...)
|
||||
|
||||
if ext.Required {
|
||||
return status.Errorf(codes.InvalidArgument, "failed to patch xDS resources in the %q extension: %v", ext.Name, err)
|
||||
return nil, status.Errorf(codes.InvalidArgument, "failed to patch xDS resources in the %q extension: %v", ext.Name, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
// applyEnvoyExtension safely checks whether an extension can be applied, and if so attempts to apply it.
|
||||
//
|
||||
// applyEnvoyExtension makes a copy of the provided IndexedResources, then applies the given extension to them.
|
||||
// The copy ensures against partial application if a non-required extension modifies a resource then fails at a later
|
||||
// stage; this is necessary because IndexedResources and its proto messages are all passed by reference, and
|
||||
// non-required extensions do not lead to a terminal failure in xDS updates.
|
||||
//
|
||||
// If the application is successful, the modified copy is returned. If not, the original and an error is returned.
|
||||
// Returning resources in either case allows for applying extensions in a loop and reporting on non-required extension
|
||||
// failures simultaneously.
|
||||
func applyEnvoyExtension(extender extensioncommon.EnvoyExtender, resources *xdscommon.IndexedResources, runtimeConfig *extensioncommon.RuntimeConfig) (r *xdscommon.IndexedResources, e error) {
|
||||
// Don't panic due to an extension misbehaving.
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
r = resources
|
||||
e = fmt.Errorf("attempt to apply Envoy extension %q caused an unexpected panic: %v",
|
||||
runtimeConfig.EnvoyExtension.Name, err)
|
||||
}
|
||||
}()
|
||||
|
||||
// First check whether the extension is eligible for application in the current environment.
|
||||
// Do this before copying indexed resources for the sake of efficiency.
|
||||
if !extender.CanApply(runtimeConfig) {
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
newResources, err := extender.Extend(xdscommon.Clone(resources), runtimeConfig)
|
||||
if err != nil {
|
||||
return resources, err
|
||||
}
|
||||
|
||||
return newResources, nil
|
||||
}
|
||||
|
||||
// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations
|
||||
|
|
|
@ -748,7 +748,7 @@ end`,
|
|||
cfgs := extensionruntime.GetRuntimeConfigurations(snap)
|
||||
for _, extensions := range cfgs {
|
||||
for _, ext := range extensions {
|
||||
err := applyEnvoyExtension(hclog.NewNullLogger(), snap, indexedResources, ext, parsedEnvoyVersion, consulVersion)
|
||||
indexedResources, err = validateAndApplyEnvoyExtension(hclog.NewNullLogger(), snap, indexedResources, ext, parsedEnvoyVersion, consulVersion)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ package xds
|
|||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
@ -13,6 +14,8 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
||||
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
|
||||
envoy_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
goversion "github.com/hashicorp/go-version"
|
||||
|
@ -1613,7 +1616,7 @@ func requireExtensionMetrics(
|
|||
}
|
||||
}
|
||||
|
||||
func Test_applyEnvoyExtension_Validations(t *testing.T) {
|
||||
func Test_validateAndApplyEnvoyExtension_Validations(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
runtimeConfig extensioncommon.RuntimeConfig
|
||||
|
@ -1713,13 +1716,315 @@ func Test_applyEnvoyExtension_Validations(t *testing.T) {
|
|||
ServiceID: structs.NewServiceID("s1", nil),
|
||||
},
|
||||
}
|
||||
err := applyEnvoyExtension(hclog.NewNullLogger(), &snap, nil, tc.runtimeConfig, envoyVersion, consulVersion)
|
||||
resources, err := validateAndApplyEnvoyExtension(hclog.NewNullLogger(), &snap, nil, tc.runtimeConfig, envoyVersion, consulVersion)
|
||||
if tc.err {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), tc.errString)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, resources)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_applyEnvoyExtension_CanApply(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
canApply bool
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
{
|
||||
name: "cannot apply: is not applied",
|
||||
canApply: false,
|
||||
},
|
||||
{
|
||||
name: "can apply: is applied",
|
||||
canApply: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
extender := extensioncommon.BasicEnvoyExtender{
|
||||
Extension: &maybeCanApplyExtension{
|
||||
canApply: tc.canApply,
|
||||
},
|
||||
}
|
||||
config := &extensioncommon.RuntimeConfig{
|
||||
Kind: api.ServiceKindConnectProxy,
|
||||
ServiceName: api.CompoundServiceName{Name: "api"},
|
||||
Upstreams: map[api.CompoundServiceName]*extensioncommon.UpstreamData{},
|
||||
IsSourcedFromUpstream: false,
|
||||
EnvoyExtension: api.EnvoyExtension{
|
||||
Name: "maybeCanApplyExtension",
|
||||
Required: false,
|
||||
},
|
||||
}
|
||||
listener := &envoy_listener_v3.Listener{
|
||||
Name: xdscommon.OutboundListenerName,
|
||||
IgnoreGlobalConnLimit: false,
|
||||
}
|
||||
indexedResources := xdscommon.IndexResources(testutil.Logger(t), map[string][]proto.Message{
|
||||
xdscommon.ListenerType: {
|
||||
listener,
|
||||
},
|
||||
})
|
||||
|
||||
result, err := applyEnvoyExtension(&extender, indexedResources, config)
|
||||
require.NoError(t, err)
|
||||
resultListener := result.Index[xdscommon.ListenerType][xdscommon.OutboundListenerName].(*envoy_listener_v3.Listener)
|
||||
require.Equal(t, tc.canApply, resultListener.IgnoreGlobalConnLimit)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_applyEnvoyExtension_PartialApplicationDisallowed(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
fail bool
|
||||
returnOnFailure bool
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
{
|
||||
name: "failure: returns nothing",
|
||||
fail: true,
|
||||
returnOnFailure: false,
|
||||
},
|
||||
// Not expected, but cover to be sure.
|
||||
{
|
||||
name: "failure: returns values",
|
||||
fail: true,
|
||||
returnOnFailure: true,
|
||||
},
|
||||
// Ensure that under normal circumstances, the extension would succeed in
|
||||
// modifying resources.
|
||||
{
|
||||
name: "success: resources modified",
|
||||
fail: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
for _, indexType := range []string{
|
||||
xdscommon.ListenerType,
|
||||
xdscommon.ClusterType,
|
||||
} {
|
||||
typeShortName := indexType[strings.LastIndex(indexType, ".")+1:]
|
||||
t.Run(fmt.Sprintf("%s: %s", tc.name, typeShortName), func(t *testing.T) {
|
||||
extender := extensioncommon.BasicEnvoyExtender{
|
||||
Extension: &partialFailureExtension{
|
||||
returnOnFailure: tc.returnOnFailure,
|
||||
// Alternate which resource fails so that we can test for
|
||||
// partial modification independent of patch order.
|
||||
failListener: tc.fail && indexType == xdscommon.ListenerType,
|
||||
failCluster: tc.fail && indexType == xdscommon.ClusterType,
|
||||
},
|
||||
}
|
||||
config := &extensioncommon.RuntimeConfig{
|
||||
Kind: api.ServiceKindConnectProxy,
|
||||
ServiceName: api.CompoundServiceName{Name: "api"},
|
||||
Upstreams: map[api.CompoundServiceName]*extensioncommon.UpstreamData{},
|
||||
IsSourcedFromUpstream: false,
|
||||
EnvoyExtension: api.EnvoyExtension{
|
||||
Name: "partialFailureExtension",
|
||||
Required: false,
|
||||
},
|
||||
}
|
||||
cluster := &envoy_cluster_v3.Cluster{
|
||||
Name: xdscommon.LocalAppClusterName,
|
||||
RespectDnsTtl: false,
|
||||
}
|
||||
listener := &envoy_listener_v3.Listener{
|
||||
Name: xdscommon.OutboundListenerName,
|
||||
IgnoreGlobalConnLimit: false,
|
||||
}
|
||||
indexedResources := xdscommon.IndexResources(testutil.Logger(t), map[string][]proto.Message{
|
||||
xdscommon.ClusterType: {
|
||||
cluster,
|
||||
},
|
||||
xdscommon.ListenerType: {
|
||||
listener,
|
||||
},
|
||||
})
|
||||
|
||||
result, err := applyEnvoyExtension(&extender, indexedResources, config)
|
||||
if tc.fail {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
resultListener := result.Index[xdscommon.ListenerType][xdscommon.OutboundListenerName].(*envoy_listener_v3.Listener)
|
||||
resultCluster := result.Index[xdscommon.ClusterType][xdscommon.LocalAppClusterName].(*envoy_cluster_v3.Cluster)
|
||||
require.Equal(t, !tc.fail, resultListener.IgnoreGlobalConnLimit)
|
||||
require.Equal(t, !tc.fail, resultCluster.RespectDnsTtl)
|
||||
|
||||
// Regardless of success, original values should not be modified.
|
||||
originalListener := indexedResources.Index[xdscommon.ListenerType][xdscommon.OutboundListenerName].(*envoy_listener_v3.Listener)
|
||||
originalCluster := indexedResources.Index[xdscommon.ClusterType][xdscommon.LocalAppClusterName].(*envoy_cluster_v3.Cluster)
|
||||
require.False(t, originalListener.IgnoreGlobalConnLimit)
|
||||
require.False(t, originalCluster.RespectDnsTtl)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Test_applyEnvoyExtension_HandlesPanics(t *testing.T) {
|
||||
type testCase struct {
|
||||
name string
|
||||
panicOnCanApply bool
|
||||
panicOnPatch bool
|
||||
}
|
||||
|
||||
cases := []testCase{
|
||||
{
|
||||
name: "panic: CanApply",
|
||||
panicOnCanApply: true,
|
||||
},
|
||||
{
|
||||
name: "panic: Extend",
|
||||
panicOnPatch: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
extension := &maybePanicExtension{
|
||||
panicOnCanApply: tc.panicOnCanApply,
|
||||
panicOnPatch: tc.panicOnPatch,
|
||||
}
|
||||
extender := extensioncommon.BasicEnvoyExtender{
|
||||
Extension: extension,
|
||||
}
|
||||
config := &extensioncommon.RuntimeConfig{
|
||||
Kind: api.ServiceKindConnectProxy,
|
||||
ServiceName: api.CompoundServiceName{Name: "api"},
|
||||
Upstreams: map[api.CompoundServiceName]*extensioncommon.UpstreamData{},
|
||||
IsSourcedFromUpstream: false,
|
||||
EnvoyExtension: api.EnvoyExtension{
|
||||
Name: "maybePanicExtension",
|
||||
Required: false,
|
||||
},
|
||||
}
|
||||
listener := &envoy_listener_v3.Listener{
|
||||
Name: xdscommon.OutboundListenerName,
|
||||
IgnoreGlobalConnLimit: false,
|
||||
}
|
||||
indexedResources := xdscommon.IndexResources(testutil.Logger(t), map[string][]proto.Message{
|
||||
xdscommon.ListenerType: {
|
||||
listener,
|
||||
},
|
||||
})
|
||||
|
||||
_, err := applyEnvoyExtension(&extender, indexedResources, config)
|
||||
|
||||
// We did not panic, good.
|
||||
// First assert our test is valid by forcing a panic, then check the error message that was returned.
|
||||
if tc.panicOnCanApply {
|
||||
require.PanicsWithError(t, "this is an expected failure in CanApply", func() {
|
||||
extension.CanApply(config)
|
||||
})
|
||||
require.ErrorContains(t, err, "attempt to apply Envoy extension \"maybePanicExtension\" caused an unexpected panic: this is an expected failure in CanApply")
|
||||
}
|
||||
if tc.panicOnPatch {
|
||||
require.PanicsWithError(t, "this is an expected failure in PatchListener", func() {
|
||||
_, _, _ = extension.PatchListener(config.GetListenerPayload(listener))
|
||||
})
|
||||
require.ErrorContains(t, err, "attempt to apply Envoy extension \"maybePanicExtension\" caused an unexpected panic: this is an expected failure in PatchListener")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type maybeCanApplyExtension struct {
|
||||
extensioncommon.BasicExtensionAdapter
|
||||
canApply bool
|
||||
}
|
||||
|
||||
var _ extensioncommon.BasicExtension = (*maybeCanApplyExtension)(nil)
|
||||
|
||||
func (m *maybeCanApplyExtension) CanApply(_ *extensioncommon.RuntimeConfig) bool {
|
||||
return m.canApply
|
||||
}
|
||||
|
||||
func (m *maybeCanApplyExtension) PatchListener(payload extensioncommon.ListenerPayload) (*envoy_listener_v3.Listener, bool, error) {
|
||||
payload.Message.IgnoreGlobalConnLimit = true
|
||||
return payload.Message, true, nil
|
||||
}
|
||||
|
||||
type partialFailureExtension struct {
|
||||
extensioncommon.BasicExtensionAdapter
|
||||
returnOnFailure bool
|
||||
failCluster bool
|
||||
failListener bool
|
||||
}
|
||||
|
||||
var _ extensioncommon.BasicExtension = (*partialFailureExtension)(nil)
|
||||
|
||||
func (p *partialFailureExtension) CanApply(_ *extensioncommon.RuntimeConfig) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *partialFailureExtension) PatchListener(payload extensioncommon.ListenerPayload) (*envoy_listener_v3.Listener, bool, error) {
|
||||
// Modify original input message
|
||||
payload.Message.IgnoreGlobalConnLimit = true
|
||||
|
||||
err := fmt.Errorf("oops - listener patch failed")
|
||||
if !p.failListener {
|
||||
err = nil
|
||||
}
|
||||
|
||||
returnMsg := payload.Message
|
||||
if err != nil && !p.returnOnFailure {
|
||||
returnMsg = nil
|
||||
}
|
||||
|
||||
patched := err == nil || p.returnOnFailure
|
||||
|
||||
return returnMsg, patched, err
|
||||
}
|
||||
|
||||
func (p *partialFailureExtension) PatchCluster(payload extensioncommon.ClusterPayload) (*envoy_cluster_v3.Cluster, bool, error) {
|
||||
// Modify original input message
|
||||
payload.Message.RespectDnsTtl = true
|
||||
|
||||
err := fmt.Errorf("oops - cluster patch failed")
|
||||
if !p.failCluster {
|
||||
err = nil
|
||||
}
|
||||
|
||||
returnMsg := payload.Message
|
||||
if err != nil && !p.returnOnFailure {
|
||||
returnMsg = nil
|
||||
}
|
||||
|
||||
patched := err == nil || p.returnOnFailure
|
||||
|
||||
return returnMsg, patched, err
|
||||
}
|
||||
|
||||
type maybePanicExtension struct {
|
||||
extensioncommon.BasicExtensionAdapter
|
||||
panicOnCanApply bool
|
||||
panicOnPatch bool
|
||||
}
|
||||
|
||||
var _ extensioncommon.BasicExtension = (*maybePanicExtension)(nil)
|
||||
|
||||
func (m *maybePanicExtension) CanApply(_ *extensioncommon.RuntimeConfig) bool {
|
||||
if m.panicOnCanApply {
|
||||
panic(fmt.Errorf("this is an expected failure in CanApply"))
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *maybePanicExtension) PatchListener(payload extensioncommon.ListenerPayload) (*envoy_listener_v3.Listener, bool, error) {
|
||||
if m.panicOnPatch {
|
||||
panic(fmt.Errorf("this is an expected failure in PatchListener"))
|
||||
}
|
||||
payload.Message.IgnoreGlobalConnLimit = true
|
||||
return payload.Message, true, nil
|
||||
}
|
||||
|
|
|
@ -103,6 +103,10 @@ type BasicEnvoyExtender struct {
|
|||
Extension BasicExtension
|
||||
}
|
||||
|
||||
func (b *BasicEnvoyExtender) CanApply(config *RuntimeConfig) bool {
|
||||
return b.Extension.CanApply(config)
|
||||
}
|
||||
|
||||
func (b *BasicEnvoyExtender) Validate(config *RuntimeConfig) error {
|
||||
return b.Extension.Validate(config)
|
||||
}
|
||||
|
@ -123,10 +127,6 @@ func (b *BasicEnvoyExtender) Extend(resources *xdscommon.IndexedResources, confi
|
|||
return resources, nil
|
||||
}
|
||||
|
||||
if !b.Extension.CanApply(config) {
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
clusters := make(ClusterMap)
|
||||
clusterLoadAssignments := make(ClusterLoadAssignmentMap)
|
||||
routes := make(RouteMap)
|
||||
|
|
|
@ -11,6 +11,10 @@ import (
|
|||
// to be dynamically executed during runtime.
|
||||
type EnvoyExtender interface {
|
||||
|
||||
// CanApply checks whether the extension configured for this extender is eligible
|
||||
// for application based on the specified RuntimeConfig.
|
||||
CanApply(*RuntimeConfig) bool
|
||||
|
||||
// Validate ensures the data in config can successfuly be used
|
||||
// to apply the specified Envoy extension.
|
||||
Validate(*RuntimeConfig) error
|
||||
|
|
|
@ -70,7 +70,10 @@ type RuntimeConfig struct {
|
|||
// that matches the given SNI, if the RuntimeConfig corresponds to an upstream of the local service.
|
||||
// Only used when IsSourcedFromUpstream is true.
|
||||
func (c RuntimeConfig) MatchesUpstreamServiceSNI(sni string) bool {
|
||||
u := c.Upstreams[c.ServiceName]
|
||||
u, ok := c.Upstreams[c.ServiceName]
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
_, match := u.SNIs[sni]
|
||||
return match
|
||||
}
|
||||
|
@ -79,7 +82,10 @@ func (c RuntimeConfig) MatchesUpstreamServiceSNI(sni string) bool {
|
|||
// upstream of the local service. Note that this could be the local service if it targets itself as an upstream.
|
||||
// Only used when IsSourcedFromUpstream is true.
|
||||
func (c RuntimeConfig) UpstreamEnvoyID() string {
|
||||
u := c.Upstreams[c.ServiceName]
|
||||
u, ok := c.Upstreams[c.ServiceName]
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
return u.EnvoyID
|
||||
}
|
||||
|
||||
|
@ -87,6 +93,9 @@ func (c RuntimeConfig) UpstreamEnvoyID() string {
|
|||
// RuntimeConfig corresponds to an upstream of the local service.
|
||||
// Only used when IsSourcedFromUpstream is true.
|
||||
func (c RuntimeConfig) UpstreamOutgoingProxyKind() api.ServiceKind {
|
||||
u := c.Upstreams[c.ServiceName]
|
||||
u, ok := c.Upstreams[c.ServiceName]
|
||||
if !ok {
|
||||
return ""
|
||||
}
|
||||
return u.OutgoingProxyKind
|
||||
}
|
||||
|
|
|
@ -30,6 +30,10 @@ type UpstreamEnvoyExtender struct {
|
|||
|
||||
var _ EnvoyExtender = (*UpstreamEnvoyExtender)(nil)
|
||||
|
||||
func (ext *UpstreamEnvoyExtender) CanApply(config *RuntimeConfig) bool {
|
||||
return ext.Extension.CanApply(config)
|
||||
}
|
||||
|
||||
func (ext *UpstreamEnvoyExtender) Validate(_ *RuntimeConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
@ -56,10 +60,6 @@ func (ext *UpstreamEnvoyExtender) Extend(resources *xdscommon.IndexedResources,
|
|||
return resources, nil
|
||||
}
|
||||
|
||||
if !ext.Extension.CanApply(config) {
|
||||
return resources, nil
|
||||
}
|
||||
|
||||
for _, indexType := range []string{
|
||||
xdscommon.ListenerType,
|
||||
xdscommon.RouteType,
|
||||
|
|
|
@ -6,6 +6,7 @@ replace github.com/hashicorp/consul/api => ../api
|
|||
|
||||
require (
|
||||
github.com/envoyproxy/go-control-plane v0.11.0
|
||||
github.com/google/go-cmp v0.5.9
|
||||
github.com/hashicorp/consul/api v1.22.0
|
||||
github.com/hashicorp/consul/sdk v0.14.0
|
||||
github.com/hashicorp/go-hclog v1.5.0
|
||||
|
|
|
@ -60,6 +60,7 @@ github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
|
|||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
|
||||
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/hashicorp/consul/sdk v0.14.0 h1:Hly+BMNMssVzoWddbBnBFi3W+Fzytvm0haSkihhj3GU=
|
||||
github.com/hashicorp/consul/sdk v0.14.0/go.mod h1:gHYeuDa0+0qRAD6Wwr6yznMBvBwHKoxSBoW5l73+saE=
|
||||
|
|
|
@ -66,6 +66,29 @@ type IndexedResources struct {
|
|||
ChildIndex map[string]map[string][]string
|
||||
}
|
||||
|
||||
// Clone makes a deep copy of the IndexedResources value at the given pointer and
|
||||
// returns a pointer to the copy.
|
||||
func Clone(i *IndexedResources) *IndexedResources {
|
||||
if i == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
iCopy := EmptyIndexedResources()
|
||||
for typeURL, typeMap := range i.Index {
|
||||
for name, msg := range typeMap {
|
||||
clone := proto.Clone(msg)
|
||||
iCopy.Index[typeURL][name] = clone
|
||||
}
|
||||
}
|
||||
for typeURL, parentMap := range i.ChildIndex {
|
||||
for name, childName := range parentMap {
|
||||
iCopy.ChildIndex[typeURL][name] = childName
|
||||
}
|
||||
}
|
||||
|
||||
return iCopy
|
||||
}
|
||||
|
||||
func IndexResources(logger hclog.Logger, resources map[string][]proto.Message) *IndexedResources {
|
||||
data := EmptyIndexedResources()
|
||||
|
||||
|
|
|
@ -0,0 +1,123 @@
|
|||
package xdscommon
|
||||
|
||||
import (
|
||||
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
|
||||
envoy_listener_v3 "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/testing/protocmp"
|
||||
duration "google.golang.org/protobuf/types/known/durationpb"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestCloneIndexedResources(t *testing.T) {
|
||||
exampleResources := map[string][]proto.Message{
|
||||
ListenerType: {
|
||||
&envoy_listener_v3.Listener{
|
||||
Name: "listener1",
|
||||
IgnoreGlobalConnLimit: true,
|
||||
ListenerFiltersTimeout: &duration.Duration{
|
||||
Seconds: 123,
|
||||
},
|
||||
},
|
||||
&envoy_listener_v3.Listener{
|
||||
Name: "listener2",
|
||||
StatPrefix: "stats.foo",
|
||||
ListenerFiltersTimeout: &duration.Duration{
|
||||
Seconds: 456,
|
||||
},
|
||||
},
|
||||
},
|
||||
ClusterType: {
|
||||
&envoy_cluster_v3.Cluster{
|
||||
Name: "cluster1",
|
||||
RespectDnsTtl: true,
|
||||
TransportSocketMatches: []*envoy_cluster_v3.Cluster_TransportSocketMatch{
|
||||
{
|
||||
Name: "match1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
getPointerField := func(msg proto.Message) interface{} {
|
||||
switch typedMsg := msg.(type) {
|
||||
case *envoy_cluster_v3.Cluster:
|
||||
return typedMsg.TransportSocketMatches[0]
|
||||
case *envoy_listener_v3.Listener:
|
||||
return typedMsg.ListenerFiltersTimeout
|
||||
default:
|
||||
panic("should not happen")
|
||||
}
|
||||
}
|
||||
updatePointerField := func(msg proto.Message) {
|
||||
switch typedMsg := msg.(type) {
|
||||
case *envoy_cluster_v3.Cluster:
|
||||
typedMsg.TransportSocketMatches[0] = &envoy_cluster_v3.Cluster_TransportSocketMatch{
|
||||
Name: "match1-updated",
|
||||
}
|
||||
case *envoy_listener_v3.Listener:
|
||||
typedMsg.ListenerFiltersTimeout = &duration.Duration{
|
||||
Seconds: 999,
|
||||
}
|
||||
default:
|
||||
panic("should not happen")
|
||||
}
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
input *IndexedResources
|
||||
hasResources bool
|
||||
}{
|
||||
{
|
||||
name: "simple compare",
|
||||
input: IndexResources(testutil.Logger(t), exampleResources),
|
||||
hasResources: true,
|
||||
},
|
||||
{
|
||||
name: "empty input returns empty",
|
||||
input: EmptyIndexedResources(),
|
||||
},
|
||||
{
|
||||
name: "nil input returns nil",
|
||||
input: nil,
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
clone := Clone(tc.input)
|
||||
|
||||
if tc.input == nil {
|
||||
require.Nil(t, clone)
|
||||
} else {
|
||||
if diff := cmp.Diff(tc.input, clone, protocmp.Transform()); diff != "" {
|
||||
t.Errorf("unexpected difference:\n%v", diff)
|
||||
}
|
||||
|
||||
require.NotSame(t, tc.input, clone)
|
||||
require.NotSame(t, tc.input.Index, clone.Index)
|
||||
require.NotSame(t, tc.input.ChildIndex, clone.ChildIndex)
|
||||
|
||||
// Ensure deep clone of protos
|
||||
for typeURL, typeMap := range tc.input.Index {
|
||||
for name, msg := range typeMap {
|
||||
require.NotSame(t, msg, clone.Index[typeURL][name])
|
||||
require.NotSame(t, getPointerField(msg), getPointerField(clone.Index[typeURL][name]))
|
||||
updatePointerField(msg)
|
||||
}
|
||||
}
|
||||
|
||||
// Only check post-update difference if there are resources to differ
|
||||
if tc.hasResources {
|
||||
if diff := cmp.Diff(tc.input, clone, protocmp.Transform()); diff == "" {
|
||||
t.Errorf("updated original and clone should be different:\n%v", diff)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue