diff --git a/.changelog/14960.txt b/.changelog/14960.txt new file mode 100644 index 0000000000..8bebd02cc0 --- /dev/null +++ b/.changelog/14960.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +xds: Added a rate limiter to the delivery of proxy config updates, to prevent updates to "global" resources such as wildcard intentions from overwhelming servers (see: `xds.update_max_per_second` config field) +``` diff --git a/agent/agent.go b/agent/agent.go index b14b6c8d97..a2a7bc7262 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -683,6 +683,7 @@ func (a *Agent) Start(ctx context.Context) error { }, TLSConfigurator: a.tlsConfigurator, IntentionDefaultAllow: intentionDefaultAllow, + UpdateRateLimit: a.config.XDSUpdateRateLimit, }) if err != nil { return err @@ -4133,6 +4134,8 @@ func (a *Agent) reloadConfigInternal(newCfg *config.RuntimeConfig) error { } } + a.proxyConfig.SetUpdateRateLimit(newCfg.XDSUpdateRateLimit) + return nil } diff --git a/agent/agent_test.go b/agent/agent_test.go index 028df62764..702bdda126 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -36,6 +36,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" "google.golang.org/grpc" "gopkg.in/square/go-jose.v2/jwt" @@ -4155,6 +4156,28 @@ func TestAgent_ReloadConfigTLSConfigFailure(t *testing.T) { assertDeepEqual(t, expectedCaPoolByFile, tlsConf.ClientCAs, cmpCertPool) } +func TestAgent_ReloadConfig_XDSUpdateRateLimit(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + cfg := fmt.Sprintf(`data_dir = %q`, testutil.TempDir(t, "agent")) + + a := NewTestAgent(t, cfg) + defer a.Shutdown() + + c := TestConfig( + testutil.Logger(t), + config.FileSource{ + Name: t.Name(), + Format: "hcl", + Data: cfg + ` xds { update_max_per_second = 1000 }`, + }, + ) + require.NoError(t, a.reloadConfigInternal(c)) + require.Equal(t, rate.Limit(1000), a.proxyConfig.UpdateRateLimit()) +} + func TestAgent_consulConfig_AutoEncryptAllowTLS(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/config/builder.go b/agent/config/builder.go index 3f3d765d28..50cfbe8237 100644 --- a/agent/config/builder.go +++ b/agent/config/builder.go @@ -1075,6 +1075,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) { UnixSocketMode: stringVal(c.UnixSocket.Mode), UnixSocketUser: stringVal(c.UnixSocket.User), Watches: c.Watches, + XDSUpdateRateLimit: rate.Limit(float64Val(c.XDS.UpdateMaxPerSecond)), AutoReloadConfigCoalesceInterval: 1 * time.Second, } diff --git a/agent/config/config.go b/agent/config/config.go index d765eb546e..156bd1926e 100644 --- a/agent/config/config.go +++ b/agent/config/config.go @@ -237,6 +237,7 @@ type Config struct { TaggedAddresses map[string]string `mapstructure:"tagged_addresses"` Telemetry Telemetry `mapstructure:"telemetry"` TranslateWANAddrs *bool `mapstructure:"translate_wan_addrs"` + XDS XDS `mapstructure:"xds"` // DEPRECATED (ui-config) - moved to the ui_config stanza UI *bool `mapstructure:"ui"` @@ -909,3 +910,7 @@ type Peering struct { // This always gets overridden in NonUserSource() TestAllowPeerRegistrations *bool `mapstructure:"test_allow_peer_registrations"` } + +type XDS struct { + UpdateMaxPerSecond *float64 `mapstructure:"update_max_per_second"` +} diff --git a/agent/config/default.go b/agent/config/default.go index 861db9e3ba..ad2b3a66ff 100644 --- a/agent/config/default.go +++ b/agent/config/default.go @@ -135,6 +135,9 @@ func DefaultSource() Source { raft_snapshot_interval = "` + cfg.RaftConfig.SnapshotInterval.String() + `" raft_trailing_logs = ` + strconv.Itoa(int(cfg.RaftConfig.TrailingLogs)) + ` + xds { + update_max_per_second = 250 + } `, } } diff --git a/agent/config/runtime.go b/agent/config/runtime.go index 6e8651779d..df4580ba63 100644 --- a/agent/config/runtime.go +++ b/agent/config/runtime.go @@ -1440,6 +1440,15 @@ type RuntimeConfig struct { // Watches []map[string]interface{} + // XDSUpdateRateLimit controls the maximum rate at which proxy config updates + // will be delivered, across all connected xDS streams. This is used to stop + // updates to "global" resources (e.g. wildcard intentions) from saturating + // system resources at the expense of other work, such as raft and gossip, + // which could cause general cluster instability. + // + // hcl: xds { update_max_per_second = (float64|MaxFloat64) } + XDSUpdateRateLimit rate.Limit + // AutoReloadConfigCoalesceInterval Coalesce Interval for auto reload config AutoReloadConfigCoalesceInterval time.Duration diff --git a/agent/config/runtime_test.go b/agent/config/runtime_test.go index 90d3b31434..c58c595c49 100644 --- a/agent/config/runtime_test.go +++ b/agent/config/runtime_test.go @@ -4547,6 +4547,7 @@ func TestLoad_IntegrationWithFlags(t *testing.T) { rt.HTTPMaxConnsPerClient = 200 rt.RPCMaxConnsPerClient = 100 rt.SegmentLimit = 64 + rt.XDSUpdateRateLimit = 250 }, }) @@ -6517,6 +6518,7 @@ func TestLoad_FullConfig(t *testing.T) { "args": []interface{}{"dltjDJ2a", "flEa7C2d"}, }, }, + XDSUpdateRateLimit: 9526.2, RaftBoltDBConfig: consul.RaftBoltDBConfig{NoFreelistSync: true}, AutoReloadConfigCoalesceInterval: 1 * time.Second, } diff --git a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden index 9d76d78bcf..0d02903af4 100644 --- a/agent/config/testdata/TestRuntimeConfig_Sanitize.golden +++ b/agent/config/testdata/TestRuntimeConfig_Sanitize.golden @@ -479,5 +479,6 @@ "Version": "", "VersionMetadata": "", "VersionPrerelease": "", - "Watches": [] -} + "Watches": [], + "XDSUpdateRateLimit": 0 +} \ No newline at end of file diff --git a/agent/config/testdata/full-config.hcl b/agent/config/testdata/full-config.hcl index f2e13c7225..62ebe42c34 100644 --- a/agent/config/testdata/full-config.hcl +++ b/agent/config/testdata/full-config.hcl @@ -755,3 +755,6 @@ watches = [{ key = "sl3Dffu7" args = ["dltjDJ2a", "flEa7C2d"] }] +xds { + update_max_per_second = 9526.2 +} diff --git a/agent/config/testdata/full-config.json b/agent/config/testdata/full-config.json index 9ff50db783..f03457d9c1 100644 --- a/agent/config/testdata/full-config.json +++ b/agent/config/testdata/full-config.json @@ -752,5 +752,8 @@ "key": "sl3Dffu7", "args": ["dltjDJ2a", "flEa7C2d"] } - ] + ], + "xds": { + "update_max_per_second": 9526.2 + } } diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index efdfe4b724..eb5df5a855 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -5,6 +5,7 @@ import ( "sync" "github.com/hashicorp/go-hclog" + "golang.org/x/time/rate" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/tlsutil" @@ -46,6 +47,8 @@ type CancelFunc func() type Manager struct { ManagerConfig + rateLimiter *rate.Limiter + mu sync.Mutex proxies map[ProxyID]*state watchers map[ProxyID]map[uint64]chan *ConfigSnapshot @@ -75,6 +78,15 @@ type ManagerConfig struct { // information to proxies that need to make intention decisions on their // own. IntentionDefaultAllow bool + + // UpdateRateLimit controls the rate at which config snapshots are delivered + // when updates are received from data sources. This enables us to reduce the + // impact of updates to "global" resources (e.g. proxy-defaults and wildcard + // intentions) that could otherwise saturate system resources, and cause Raft + // or gossip instability. + // + // Defaults to rate.Inf (no rate limit). + UpdateRateLimit rate.Limit } // NewManager constructs a Manager. @@ -82,14 +94,30 @@ func NewManager(cfg ManagerConfig) (*Manager, error) { if cfg.Source == nil || cfg.Logger == nil { return nil, errors.New("all ManagerConfig fields must be provided") } + + if cfg.UpdateRateLimit == 0 { + cfg.UpdateRateLimit = rate.Inf + } + m := &Manager{ ManagerConfig: cfg, proxies: make(map[ProxyID]*state), watchers: make(map[ProxyID]map[uint64]chan *ConfigSnapshot), + rateLimiter: rate.NewLimiter(cfg.UpdateRateLimit, 1), } return m, nil } +// UpdateRateLimit returns the configured update rate limit (see ManagerConfig). +func (m *Manager) UpdateRateLimit() rate.Limit { + return m.rateLimiter.Limit() +} + +// SetUpdateRateLimit configures the update rate limit (see ManagerConfig). +func (m *Manager) SetUpdateRateLimit(l rate.Limit) { + m.rateLimiter.SetLimit(l) +} + // RegisteredProxies returns a list of the proxies tracked by Manager, filtered // by source. func (m *Manager) RegisteredProxies(source ProxySource) []ProxyID { @@ -143,7 +171,7 @@ func (m *Manager) Register(id ProxyID, ns *structs.NodeService, source ProxySour } var err error - state, err = newState(id, ns, source, token, stateConfig) + state, err = newState(id, ns, source, token, stateConfig, m.rateLimiter) if err != nil { return err } diff --git a/agent/proxycfg/state.go b/agent/proxycfg/state.go index 7a2ff5eb1b..24141d4d7b 100644 --- a/agent/proxycfg/state.go +++ b/agent/proxycfg/state.go @@ -10,6 +10,7 @@ import ( "time" "github.com/hashicorp/go-hclog" + "golang.org/x/time/rate" cachetype "github.com/hashicorp/consul/agent/cache-types" "github.com/hashicorp/consul/agent/structs" @@ -79,6 +80,8 @@ type state struct { ch chan UpdateEvent snapCh chan ConfigSnapshot reqCh chan chan *ConfigSnapshot + + rateLimiter *rate.Limiter } // failed returns whether run exited because a data source is in an @@ -148,7 +151,7 @@ func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error // // The returned state needs its required dependencies to be set before Watch // can be called. -func newState(id ProxyID, ns *structs.NodeService, source ProxySource, token string, config stateConfig) (*state, error) { +func newState(id ProxyID, ns *structs.NodeService, source ProxySource, token string, config stateConfig, rateLimiter *rate.Limiter) (*state, error) { // 10 is fairly arbitrary here but allow for the 3 mandatory and a // reasonable number of upstream watches to all deliver their initial // messages in parallel without blocking the cache.Notify loops. It's not a @@ -176,6 +179,7 @@ func newState(id ProxyID, ns *structs.NodeService, source ProxySource, token str ch: ch, snapCh: make(chan ConfigSnapshot, 1), reqCh: make(chan chan *ConfigSnapshot, 1), + rateLimiter: rateLimiter, }, nil } @@ -303,6 +307,20 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) { sendCh := make(chan struct{}) var coalesceTimer *time.Timer + scheduleUpdate := func() { + // Wait for MAX(, coalesceTimeout) + delay := s.rateLimiter.Reserve().Delay() + if delay < coalesceTimeout { + delay = coalesceTimeout + } + coalesceTimer = time.AfterFunc(delay, func() { + // This runs in another goroutine so we can't just do the send + // directly here as access to snap is racy. Instead, signal the main + // loop above. + sendCh <- struct{}{} + }) + } + for { select { case <-ctx.Done(): @@ -345,9 +363,7 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) { s.logger.Trace("Failed to deliver new snapshot to proxy config watchers") // Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly. - coalesceTimer = time.AfterFunc(coalesceTimeout, func() { - sendCh <- struct{}{} - }) + scheduleUpdate() // Do not reset coalesceTimer since we just queued a timer-based refresh continue @@ -375,15 +391,10 @@ func (s *state) run(ctx context.Context, snap *ConfigSnapshot) { // Check if snap is complete enough to be a valid config to deliver to a // proxy yet. if snap.Valid() { - // Don't send it right away, set a short timer that will wait for updates - // from any of the other cache values and deliver them all together. if coalesceTimer == nil { - coalesceTimer = time.AfterFunc(coalesceTimeout, func() { - // This runs in another goroutine so we can't just do the send - // directly here as access to snap is racy. Instead, signal the main - // loop above. - sendCh <- struct{}{} - }) + // Don't send it right away, set a short timer that will wait for updates + // from any of the other cache values and deliver them all together. + scheduleUpdate() } } } diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index e80ea4e63f..eb41aaaf98 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" "github.com/hashicorp/consul/acl" cachetype "github.com/hashicorp/consul/agent/cache-types" @@ -106,7 +107,7 @@ func TestStateChanged(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { proxyID := ProxyID{ServiceID: tt.ns.CompoundServiceID()} - state, err := newState(proxyID, tt.ns, testSource, tt.token, stateConfig{logger: hclog.New(nil)}) + state, err := newState(proxyID, tt.ns, testSource, tt.token, stateConfig{logger: hclog.New(nil)}, rate.NewLimiter(rate.Inf, 1)) require.NoError(t, err) otherNS, otherToken := tt.mutate(*tt.ns, tt.token) require.Equal(t, tt.want, state.Changed(otherNS, otherToken)) @@ -3463,7 +3464,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { } wr := recordWatches(&sc) - state, err := newState(proxyID, &tc.ns, testSource, "", sc) + state, err := newState(proxyID, &tc.ns, testSource, "", sc, rate.NewLimiter(rate.Inf, 0)) // verify building the initial state worked require.NoError(t, err) diff --git a/website/content/docs/agent/config/config-files.mdx b/website/content/docs/agent/config/config-files.mdx index 430069628c..92c34cc967 100644 --- a/website/content/docs/agent/config/config-files.mdx +++ b/website/content/docs/agent/config/config-files.mdx @@ -2206,3 +2206,15 @@ tls { Consul will not enable TLS for the HTTP or gRPC API unless the `https` port has been assigned a port number `> 0`. We recommend using `8501` for `https` as this default will automatically work with some tooling. + +## xDS Server Parameters + +- `xds`: This object allows you to configure the behavior of Consul's +[xDS protocol](https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol) +server. + + - `update_max_per_second`: Specifies the number of proxy configuration updates across all connected xDS streams that are allowed per second. This configuration prevents updates to global resources, such as wildcard intentions, from consuming system resources at the expense of other processes, such as Raft and Gossip, which could cause general cluster instability. + + The default value is `250`. It is based on a load test of 5,000 streams connected to a single server with two CPU cores. + + If necessary, you can lower or increase the limit without a rolling restart by using the `consul reload` command or by sending the server a `SIGHUP`.