diff --git a/.changelog/14397.txt b/.changelog/14397.txt new file mode 100644 index 0000000000..96dbf81be4 --- /dev/null +++ b/.changelog/14397.txt @@ -0,0 +1,3 @@ +```release-note:feature +xds: servers will limit the number of concurrent xDS streams they can handle to balance the load across all servers +``` diff --git a/agent/agent.go b/agent/agent.go index 43e1a6ea8f..7094579f6b 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -707,6 +707,9 @@ func (a *Agent) Start(ctx context.Context) error { return err } + // Start a goroutine to terminate excess xDS sessions. + go a.baseDeps.XDSStreamLimiter.Run(&lib.StopChannelContext{StopCh: a.shutdownCh}) + // register watches if err := a.reloadWatches(a.config); err != nil { return err @@ -791,6 +794,7 @@ func (a *Agent) listenAndServeGRPC() error { return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil) }, a, + a.baseDeps.XDSStreamLimiter, ) a.xdsServer.Register(a.externalGRPCServer) diff --git a/agent/consul/autopilot.go b/agent/consul/autopilot.go index 58acd20ff0..a41febb058 100644 --- a/agent/consul/autopilot.go +++ b/agent/consul/autopilot.go @@ -55,6 +55,14 @@ func (d *AutopilotDelegate) NotifyState(state *autopilot.State) { } d.readyServersPublisher.PublishReadyServersEvents(state) + + var readyServers uint32 + for _, server := range state.Servers { + if autopilotevents.IsServerReady(server) { + readyServers++ + } + } + d.server.xdsCapacityController.SetServerCount(readyServers) } func (d *AutopilotDelegate) RemoveFailedServer(srv *autopilot.Server) { diff --git a/agent/consul/autopilotevents/ready_servers_events.go b/agent/consul/autopilotevents/ready_servers_events.go index cbc9819491..6e7734feeb 100644 --- a/agent/consul/autopilotevents/ready_servers_events.go +++ b/agent/consul/autopilotevents/ready_servers_events.go @@ -198,25 +198,32 @@ func (r *ReadyServersEventPublisher) readyServersEvents(state *autopilot.State) return []stream.Event{r.newReadyServersEvent(servers)}, true } +// IsServerReady determines whether the given server (from the autopilot state) +// is "ready" - by which we mean that they would be an acceptable target for +// stale queries. +func IsServerReady(srv *autopilot.ServerState) bool { + // All healthy servers are caught up enough to be considered ready. + // Servers with voting rights that are still healthy according to Serf are + // also included as they have likely just fallen behind the leader a little + // after initially replicating state. They are still acceptable targets + // for most stale queries and clients can bound the staleness if necessary. + // Including them is a means to prevent flapping the list of servers we + // advertise as ready and flooding the network with notifications to all + // dataplanes of server updates. + // + // TODO (agentless) for a non-voting server that is still alive but fell + // behind, should we cause it to be removed. For voters we know they were caught + // up at some point but for non-voters we cannot know the same thing. + return srv.Health.Healthy || (srv.HasVotingRights() && srv.Server.NodeStatus == autopilot.NodeAlive) +} + // autopilotStateToReadyServers will iterate through all servers in the autopilot // state and compile a list of servers which are "ready". Readiness means that // they would be an acceptable target for stale queries. func (r *ReadyServersEventPublisher) autopilotStateToReadyServers(state *autopilot.State) EventPayloadReadyServers { var servers EventPayloadReadyServers for _, srv := range state.Servers { - // All healthy servers are caught up enough to be included in a ready servers. - // Servers with voting rights that are still healthy according to Serf are - // also included as they have likely just fallen behind the leader a little - // after initially replicating state. They are still acceptable targets - // for most stale queries and clients can bound the staleness if necessary. - // Including them is a means to prevent flapping the list of servers we - // advertise as ready and flooding the network with notifications to all - // dataplanes of server updates. - // - // TODO (agentless) for a non-voting server that is still alive but fell - // behind, should we cause it to be removed. For voters we know they were caught - // up at some point but for non-voters we cannot know the same thing. - if srv.Health.Healthy || (srv.HasVotingRights() && srv.Server.NodeStatus == autopilot.NodeAlive) { + if IsServerReady(srv) { // autopilot information contains addresses in the : form. We only care about the // the host so we parse it out here and discard the port. host, err := extractHost(string(srv.Server.Address)) diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index 32199d8aba..ff150d545f 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -18,6 +18,7 @@ import ( msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc" "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/grpc-external/limiter" grpc "github.com/hashicorp/consul/agent/grpc-internal" "github.com/hashicorp/consul/agent/grpc-internal/resolver" "github.com/hashicorp/consul/agent/pool" @@ -553,6 +554,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps { NewRequestRecorderFunc: middleware.NewRequestRecorder, GetNetRPCInterceptorFunc: middleware.GetNetRPCInterceptor, EnterpriseDeps: newDefaultDepsEnterprise(t, logger, c), + XDSStreamLimiter: limiter.NewSessionLimiter(), } } diff --git a/agent/consul/options.go b/agent/consul/options.go index 576009aa04..afc69b51db 100644 --- a/agent/consul/options.go +++ b/agent/consul/options.go @@ -7,6 +7,7 @@ import ( "github.com/hashicorp/consul-net-rpc/net/rpc" "github.com/hashicorp/consul/agent/consul/stream" + "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/rpc/middleware" @@ -15,14 +16,15 @@ import ( ) type Deps struct { - EventPublisher *stream.EventPublisher - Logger hclog.InterceptLogger - TLSConfigurator *tlsutil.Configurator - Tokens *token.Store - Router *router.Router - ConnPool *pool.ConnPool - GRPCConnPool GRPCClientConner - LeaderForwarder LeaderForwarder + EventPublisher *stream.EventPublisher + Logger hclog.InterceptLogger + TLSConfigurator *tlsutil.Configurator + Tokens *token.Store + Router *router.Router + ConnPool *pool.ConnPool + GRPCConnPool GRPCClientConner + LeaderForwarder LeaderForwarder + XDSStreamLimiter *limiter.SessionLimiter // GetNetRPCInterceptorFunc, if not nil, sets the net/rpc rpc.ServerServiceCallInterceptor on // the server side to record metrics around the RPC requests. If nil, no interceptor is added to // the rpc server. diff --git a/agent/consul/server.go b/agent/consul/server.go index b6e77a4a17..c5d218c0a2 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -39,6 +39,7 @@ import ( "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/usagemetrics" "github.com/hashicorp/consul/agent/consul/wanfed" + "github.com/hashicorp/consul/agent/consul/xdscapacity" aclgrpc "github.com/hashicorp/consul/agent/grpc-external/services/acl" "github.com/hashicorp/consul/agent/grpc-external/services/connectca" "github.com/hashicorp/consul/agent/grpc-external/services/dataplane" @@ -374,6 +375,10 @@ type Server struct { // peeringServer handles peering RPC requests internal to this cluster, like generating peering tokens. peeringServer *peering.Server + // xdsCapacityController controls the number of concurrent xDS streams the + // server is able to handle. + xdsCapacityController *xdscapacity.Controller + // embedded struct to hold all the enterprise specific data EnterpriseServer } @@ -749,6 +754,13 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser s.grpcLeaderForwarder = flat.LeaderForwarder go s.trackLeaderChanges() + s.xdsCapacityController = xdscapacity.NewController(xdscapacity.Config{ + Logger: s.logger.Named(logging.XDSCapacityController), + GetStore: func() xdscapacity.Store { return s.fsm.State() }, + SessionLimiter: flat.XDSStreamLimiter, + }) + go s.xdsCapacityController.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) + // Initialize Autopilot. This must happen before starting leadership monitoring // as establishing leadership could attempt to use autopilot and cause a panic. s.initAutopilot(config) diff --git a/agent/consul/state/usage.go b/agent/consul/state/usage.go index 86b0d80ee8..cfb38232a9 100644 --- a/agent/consul/state/usage.go +++ b/agent/consul/state/usage.go @@ -325,7 +325,7 @@ func (s *Store) NodeUsage() (uint64, NodeUsage, error) { tx := s.db.ReadTxn() defer tx.Abort() - nodes, err := firstUsageEntry(tx, tableNodes) + nodes, err := firstUsageEntry(nil, tx, tableNodes) if err != nil { return 0, NodeUsage{}, fmt.Errorf("failed nodes lookup: %s", err) } @@ -347,7 +347,7 @@ func (s *Store) PeeringUsage() (uint64, PeeringUsage, error) { tx := s.db.ReadTxn() defer tx.Abort() - peerings, err := firstUsageEntry(tx, tablePeering) + peerings, err := firstUsageEntry(nil, tx, tablePeering) if err != nil { return 0, PeeringUsage{}, fmt.Errorf("failed peerings lookup: %s", err) } @@ -365,23 +365,23 @@ func (s *Store) PeeringUsage() (uint64, PeeringUsage, error) { // ServiceUsage returns the latest seen Raft index, a compiled set of service // usage data, and any errors. -func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) { +func (s *Store) ServiceUsage(ws memdb.WatchSet) (uint64, ServiceUsage, error) { tx := s.db.ReadTxn() defer tx.Abort() - serviceInstances, err := firstUsageEntry(tx, tableServices) + serviceInstances, err := firstUsageEntry(ws, tx, tableServices) if err != nil { return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) } - services, err := firstUsageEntry(tx, serviceNamesUsageTable) + services, err := firstUsageEntry(ws, tx, serviceNamesUsageTable) if err != nil { return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) } serviceKindInstances := make(map[string]int) for _, kind := range allConnectKind { - usage, err := firstUsageEntry(tx, connectUsageTableName(kind)) + usage, err := firstUsageEntry(ws, tx, connectUsageTableName(kind)) if err != nil { return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) } @@ -393,7 +393,7 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) { Services: services.Count, ConnectServiceInstances: serviceKindInstances, } - results, err := compileEnterpriseServiceUsage(tx, usage) + results, err := compileEnterpriseServiceUsage(ws, tx, usage) if err != nil { return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) } @@ -405,7 +405,7 @@ func (s *Store) KVUsage() (uint64, KVUsage, error) { tx := s.db.ReadTxn() defer tx.Abort() - kvs, err := firstUsageEntry(tx, "kvs") + kvs, err := firstUsageEntry(nil, tx, "kvs") if err != nil { return 0, KVUsage{}, fmt.Errorf("failed kvs lookup: %s", err) } @@ -428,7 +428,7 @@ func (s *Store) ConfigEntryUsage() (uint64, ConfigEntryUsage, error) { configEntries := make(map[string]int) var maxIdx uint64 for _, kind := range structs.AllConfigEntryKinds { - configEntry, err := firstUsageEntry(tx, configEntryUsageTableName(kind)) + configEntry, err := firstUsageEntry(nil, tx, configEntryUsageTableName(kind)) if configEntry.Index > maxIdx { maxIdx = configEntry.Index } @@ -448,11 +448,12 @@ func (s *Store) ConfigEntryUsage() (uint64, ConfigEntryUsage, error) { return maxIdx, results, nil } -func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) { - usage, err := tx.First(tableUsage, indexID, id) +func firstUsageEntry(ws memdb.WatchSet, tx ReadTxn, id string) (*UsageEntry, error) { + watch, usage, err := tx.FirstWatch(tableUsage, indexID, id) if err != nil { return nil, err } + ws.Add(watch) // If no elements have been inserted, the usage entry will not exist. We // return a valid value so that can be certain the return value is not nil diff --git a/agent/consul/state/usage_oss.go b/agent/consul/state/usage_oss.go index edea0e81fc..a9b4d1c2fe 100644 --- a/agent/consul/state/usage_oss.go +++ b/agent/consul/state/usage_oss.go @@ -29,7 +29,7 @@ func addEnterpriseKVUsage(map[string]int, memdb.Change) {} func addEnterpriseConfigEntryUsage(map[string]int, memdb.Change) {} -func compileEnterpriseServiceUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) { +func compileEnterpriseServiceUsage(ws memdb.WatchSet, tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) { return usage, nil } diff --git a/agent/consul/state/usage_test.go b/agent/consul/state/usage_test.go index 223e1b062f..5c69cc2d55 100644 --- a/agent/consul/state/usage_test.go +++ b/agent/consul/state/usage_test.go @@ -1,7 +1,9 @@ package state import ( + "context" "testing" + "time" memdb "github.com/hashicorp/go-memdb" "github.com/stretchr/testify/require" @@ -150,7 +152,7 @@ func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) { s := testStateStore(t) // No services have been registered, and thus no usage entry exists - idx, usage, err := s.ServiceUsage() + idx, usage, err := s.ServiceUsage(nil) require.NoError(t, err) require.Equal(t, idx, uint64(0)) require.Equal(t, usage.Services, 0) @@ -174,13 +176,22 @@ func TestStateStore_Usage_ServiceUsage(t *testing.T) { testRegisterConnectNativeService(t, s, 14, "node2", "service-native") testRegisterConnectNativeService(t, s, 15, "node2", "service-native-1") - idx, usage, err := s.ServiceUsage() + ws := memdb.NewWatchSet() + idx, usage, err := s.ServiceUsage(ws) require.NoError(t, err) require.Equal(t, idx, uint64(15)) require.Equal(t, 5, usage.Services) require.Equal(t, 8, usage.ServiceInstances) require.Equal(t, 2, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)]) require.Equal(t, 3, usage.ConnectServiceInstances[connectNativeInstancesTable]) + + testRegisterSidecarProxy(t, s, 16, "node2", "service2") + + select { + case <-ws.WatchCh(context.Background()): + case <-time.After(100 * time.Millisecond): + t.Fatal("timeout waiting on WatchSet") + } } func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) { @@ -207,7 +218,7 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) { testRegisterSidecarProxy(t, s, 3, "node1", "service2") testRegisterConnectNativeService(t, s, 4, "node1", "service-connect") - idx, usage, err := s.ServiceUsage() + idx, usage, err := s.ServiceUsage(nil) require.NoError(t, err) require.Equal(t, idx, uint64(4)) require.Equal(t, 3, usage.Services) @@ -217,7 +228,7 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) { require.NoError(t, s.DeleteNode(4, "node1", nil, "")) - idx, usage, err = s.ServiceUsage() + idx, usage, err = s.ServiceUsage(nil) require.NoError(t, err) require.Equal(t, idx, uint64(4)) require.Equal(t, usage.Services, 0) @@ -245,7 +256,7 @@ func TestStateStore_Usage_ServiceUsagePeering(t *testing.T) { testRegisterConnectNativeService(t, s, 7, "node2", "service-native") testutil.RunStep(t, "writes", func(t *testing.T) { - idx, usage, err := s.ServiceUsage() + idx, usage, err := s.ServiceUsage(nil) require.NoError(t, err) require.Equal(t, uint64(7), idx) require.Equal(t, 3, usage.Services) @@ -257,7 +268,7 @@ func TestStateStore_Usage_ServiceUsagePeering(t *testing.T) { testutil.RunStep(t, "deletes", func(t *testing.T) { require.NoError(t, s.DeleteNode(7, "node1", nil, peerName)) require.NoError(t, s.DeleteNode(8, "node2", nil, "")) - idx, usage, err := s.ServiceUsage() + idx, usage, err := s.ServiceUsage(nil) require.NoError(t, err) require.Equal(t, uint64(8), idx) require.Equal(t, 0, usage.Services) @@ -295,7 +306,7 @@ func TestStateStore_Usage_Restore(t *testing.T) { require.Equal(t, idx, uint64(9)) require.Equal(t, nodeUsage.Nodes, 1) - idx, usage, err := s.ServiceUsage() + idx, usage, err := s.ServiceUsage(nil) require.NoError(t, err) require.Equal(t, idx, uint64(9)) require.Equal(t, usage.Services, 1) @@ -395,7 +406,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) { require.NoError(t, s.EnsureService(2, "node1", svc)) // We renamed a service with a single instance, so we maintain 1 service. - idx, usage, err := s.ServiceUsage() + idx, usage, err := s.ServiceUsage(nil) require.NoError(t, err) require.Equal(t, idx, uint64(2)) require.Equal(t, usage.Services, 1) @@ -415,7 +426,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) { require.NoError(t, s.EnsureService(3, "node1", svc)) // We renamed a service with a single instance, so we maintain 1 service. - idx, usage, err := s.ServiceUsage() + idx, usage, err := s.ServiceUsage(nil) require.NoError(t, err) require.Equal(t, idx, uint64(3)) require.Equal(t, usage.Services, 1) @@ -436,7 +447,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) { require.NoError(t, s.EnsureService(4, "node1", svc)) // We renamed a service with a single instance, so we maintain 1 service. - idx, usage, err := s.ServiceUsage() + idx, usage, err := s.ServiceUsage(nil) require.NoError(t, err) require.Equal(t, idx, uint64(4)) require.Equal(t, usage.Services, 1) @@ -467,7 +478,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) { } require.NoError(t, s.EnsureService(6, "node1", svc3)) - idx, usage, err := s.ServiceUsage() + idx, usage, err := s.ServiceUsage(nil) require.NoError(t, err) require.Equal(t, idx, uint64(6)) require.Equal(t, usage.Services, 2) @@ -485,7 +496,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) { } require.NoError(t, s.EnsureService(7, "node1", update)) - idx, usage, err = s.ServiceUsage() + idx, usage, err = s.ServiceUsage(nil) require.NoError(t, err) require.Equal(t, idx, uint64(7)) require.Equal(t, usage.Services, 3) @@ -511,7 +522,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) { require.NoError(t, s.EnsureService(2, "node1", svc)) // We renamed a service with a single instance, so we maintain 1 service. - idx, usage, err := s.ServiceUsage() + idx, usage, err := s.ServiceUsage(nil) require.NoError(t, err) require.Equal(t, idx, uint64(2)) require.Equal(t, usage.Services, 1) @@ -537,7 +548,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) { } require.NoError(t, s.EnsureService(4, "node1", svc3)) - idx, usage, err := s.ServiceUsage() + idx, usage, err := s.ServiceUsage(nil) require.NoError(t, err) require.Equal(t, idx, uint64(4)) require.Equal(t, usage.Services, 2) @@ -552,7 +563,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) { } require.NoError(t, s.EnsureService(5, "node1", update)) - idx, usage, err = s.ServiceUsage() + idx, usage, err = s.ServiceUsage(nil) require.NoError(t, err) require.Equal(t, idx, uint64(5)) require.Equal(t, usage.Services, 3) diff --git a/agent/consul/usagemetrics/usagemetrics.go b/agent/consul/usagemetrics/usagemetrics.go index 7824224f14..ed814155e6 100644 --- a/agent/consul/usagemetrics/usagemetrics.go +++ b/agent/consul/usagemetrics/usagemetrics.go @@ -178,7 +178,7 @@ func (u *UsageMetricsReporter) runOnce() { u.emitPeeringUsage(peeringUsage) - _, serviceUsage, err := state.ServiceUsage() + _, serviceUsage, err := state.ServiceUsage(nil) if err != nil { u.logger.Warn("failed to retrieve services from state store", "error", err) } diff --git a/agent/consul/xdscapacity/capacity.go b/agent/consul/xdscapacity/capacity.go new file mode 100644 index 0000000000..2e24a09e0e --- /dev/null +++ b/agent/consul/xdscapacity/capacity.go @@ -0,0 +1,210 @@ +package xdscapacity + +import ( + "context" + "math" + "time" + + "github.com/armon/go-metrics" + "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib/retry" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" + "golang.org/x/time/rate" +) + +var StatsGauges = []prometheus.GaugeDefinition{ + { + Name: []string{"xds", "server", "idealStreamsMax"}, + Help: "The maximum number of xDS streams per server, chosen to achieve a roughly even spread of load across servers.", + }, +} + +// errorMargin is amount to which we allow a server to be over-occupied, +// expressed as a percentage (between 0 and 1). +// +// We allow 10% more than the ideal number of streams per server. +const errorMargin = 0.1 + +// Controller determines the ideal number of xDS streams for the server to +// handle and enforces it using the given SessionLimiter. +// +// We aim for a roughly even spread of streams between servers in the cluster +// and, to that end, limit the number of streams each server can handle to: +// +// ( / ) + +// +// Controller receives changes to the number of healthy servers from the +// autopilot delegate. It queries the state store's catalog tables to discover +// the number of registered proxy (sidecar and gateway) services. +type Controller struct { + cfg Config + + serverCh chan uint32 + doneCh chan struct{} + + prevMaxSessions uint32 + prevRateLimit rate.Limit +} + +// Config contains the dependencies for Controller. +type Config struct { + Logger hclog.Logger + GetStore func() Store + SessionLimiter SessionLimiter +} + +// SessionLimiter is used to enforce the session limit to achieve the ideal +// spread of xDS streams between servers. +type SessionLimiter interface { + SetMaxSessions(maxSessions uint32) + SetDrainRateLimit(rateLimit rate.Limit) +} + +// NewController creates a new capacity controller with the given config. +// +// Call Run to start the control-loop. +func NewController(cfg Config) *Controller { + return &Controller{ + cfg: cfg, + serverCh: make(chan uint32), + doneCh: make(chan struct{}), + } +} + +// Run the control-loop until the given context is canceled or reaches its +// deadline. +func (c *Controller) Run(ctx context.Context) { + defer close(c.doneCh) + + ws, numProxies, err := c.countProxies(ctx) + if err != nil { + return + } + + var numServers uint32 + for { + select { + case s := <-c.serverCh: + numServers = s + c.updateMaxSessions(numServers, numProxies) + case <-ws.WatchCh(ctx): + ws, numProxies, err = c.countProxies(ctx) + if err != nil { + return + } + c.updateDrainRateLimit(numProxies) + c.updateMaxSessions(numServers, numProxies) + case <-ctx.Done(): + return + } + } +} + +// SetServerCount updates the number of healthy servers that is used when +// determining capacity. It is called by the autopilot delegate. +func (c *Controller) SetServerCount(count uint32) { + select { + case c.serverCh <- count: + case <-c.doneCh: + } +} + +func (c *Controller) updateDrainRateLimit(numProxies uint32) { + rateLimit := calcRateLimit(numProxies) + if rateLimit == c.prevRateLimit { + return + } + + c.cfg.Logger.Debug("updating drain rate limit", "rate_limit", rateLimit) + c.cfg.SessionLimiter.SetDrainRateLimit(rateLimit) + c.prevRateLimit = rateLimit +} + +// We dynamically scale the rate at which excess sessions will be drained +// according to the number of proxies in the catalog. +// +// The numbers here are pretty arbitrary (change them if you find better ones!) +// but the logic is: +// +// 0-512 proxies: drain 1 per second +// 513-2815 proxies: linearly scaled by 1/s for every additional 256 proxies +// 2816+ proxies: drain 10 per second +// +func calcRateLimit(numProxies uint32) rate.Limit { + perSecond := math.Floor((float64(numProxies) - 256) / 256) + + if perSecond < 1 { + return 1 + } + + if perSecond > 10 { + return 10 + } + + return rate.Limit(perSecond) +} + +func (c *Controller) updateMaxSessions(numServers, numProxies uint32) { + if numServers == 0 || numProxies == 0 { + return + } + + maxSessions := uint32(math.Ceil((float64(numProxies) / float64(numServers)) * (1 + errorMargin))) + if maxSessions == c.prevMaxSessions { + return + } + + c.cfg.Logger.Debug( + "updating max sessions", + "max_sessions", maxSessions, + "num_servers", numServers, + "num_proxies", numProxies, + ) + metrics.SetGauge([]string{"xds", "server", "idealStreamsMax"}, float32(maxSessions)) + c.cfg.SessionLimiter.SetMaxSessions(maxSessions) + c.prevMaxSessions = maxSessions +} + +// countProxies counts the number of registered proxy services, retrying on +// error until the given context is cancelled. +func (c *Controller) countProxies(ctx context.Context) (memdb.WatchSet, uint32, error) { + retryWaiter := &retry.Waiter{ + MinFailures: 1, + MinWait: 1 * time.Second, + MaxWait: 1 * time.Minute, + } + + for { + store := c.cfg.GetStore() + + ws := memdb.NewWatchSet() + ws.Add(store.AbandonCh()) + + var count uint32 + _, usage, err := store.ServiceUsage(ws) + + // Query failed? Wait for a while, and then go to the top of the loop to + // retry (unless the context is cancelled). + if err != nil { + if err := retryWaiter.Wait(ctx); err != nil { + return nil, 0, err + } + continue + } + + for kind, kindCount := range usage.ConnectServiceInstances { + if structs.ServiceKind(kind).IsProxy() { + count += uint32(kindCount) + } + } + return ws, count, nil + } +} + +type Store interface { + AbandonCh() <-chan struct{} + ServiceUsage(ws memdb.WatchSet) (uint64, state.ServiceUsage, error) +} diff --git a/agent/consul/xdscapacity/capacity_test.go b/agent/consul/xdscapacity/capacity_test.go new file mode 100644 index 0000000000..1b564949c4 --- /dev/null +++ b/agent/consul/xdscapacity/capacity_test.go @@ -0,0 +1,136 @@ +package xdscapacity + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/armon/go-metrics" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/sdk/testutil" +) + +func TestController(t *testing.T) { + const index = 123 + + store := state.NewStateStore(nil) + + // This loop generates: + // + // 4 (service kind) * 5 (service) * 5 * (node) = 100 proxy services. And 25 non-proxy services. + for _, kind := range []structs.ServiceKind{ + // These will be included in the count. + structs.ServiceKindConnectProxy, + structs.ServiceKindIngressGateway, + structs.ServiceKindTerminatingGateway, + structs.ServiceKindMeshGateway, + + // This one will not. + structs.ServiceKindTypical, + } { + for i := 0; i < 5; i++ { + serviceName := fmt.Sprintf("%s-%d", kind, i) + + for j := 0; j < 5; j++ { + nodeName := fmt.Sprintf("%s-node-%d", serviceName, j) + + require.NoError(t, store.EnsureRegistration(index, &structs.RegisterRequest{ + Node: nodeName, + Service: &structs.NodeService{ + ID: serviceName, + Service: serviceName, + Kind: kind, + }, + })) + } + } + } + + limiter := newTestLimiter() + + sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute) + cfg := metrics.DefaultConfig("consul") + cfg.EnableHostname = false + metrics.NewGlobal(cfg, sink) + + t.Cleanup(func() { + sink := &metrics.BlackholeSink{} + metrics.NewGlobal(cfg, sink) + }) + + adj := NewController(Config{ + Logger: testutil.Logger(t), + GetStore: func() Store { return store }, + SessionLimiter: limiter, + }) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go adj.Run(ctx) + + // Keen readers will notice the numbers here are off by one. This is due to + // floating point math (because we multiply by 1.1). + testutil.RunStep(t, "load split between 2 servers", func(t *testing.T) { + adj.SetServerCount(2) + require.Equal(t, 56, limiter.receive(t)) + }) + + testutil.RunStep(t, "all load on 1 server", func(t *testing.T) { + adj.SetServerCount(1) + require.Equal(t, 111, limiter.receive(t)) + }) + + testutil.RunStep(t, "delete proxy service", func(t *testing.T) { + require.NoError(t, store.DeleteService(index+1, "ingress-gateway-0-node-0", "ingress-gateway-0", acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword)) + require.Equal(t, 109, limiter.receive(t)) + }) + + testutil.RunStep(t, "check we're emitting gauge", func(t *testing.T) { + data := sink.Data() + require.Len(t, data, 1) + + gauge, ok := data[0].Gauges["consul.xds.server.idealStreamsMax"] + require.True(t, ok) + require.Equal(t, float32(109), gauge.Value) + }) +} + +func newTestLimiter() *testLimiter { + return &testLimiter{ch: make(chan uint32, 1)} +} + +type testLimiter struct{ ch chan uint32 } + +func (tl *testLimiter) SetMaxSessions(max uint32) { tl.ch <- max } + +func (tl *testLimiter) receive(t *testing.T) int { + select { + case v := <-tl.ch: + return int(v) + case <-time.After(1 * time.Second): + t.Fatal("timeout waiting for SetMaxSessions") + } + panic("this should never be reached") +} + +func (tl *testLimiter) SetDrainRateLimit(rateLimit rate.Limit) {} + +func TestCalcRateLimit(t *testing.T) { + for in, out := range map[uint32]rate.Limit{ + 0: rate.Limit(1), + 1: rate.Limit(1), + 512: rate.Limit(1), + 768: rate.Limit(2), + 1024: rate.Limit(3), + 2816: rate.Limit(10), + 1000000000: rate.Limit(10), + } { + require.Equalf(t, out, calcRateLimit(in), "calcRateLimit(%d)", in) + } +} diff --git a/agent/grpc-external/limiter/limiter.go b/agent/grpc-external/limiter/limiter.go new file mode 100644 index 0000000000..26e013b161 --- /dev/null +++ b/agent/grpc-external/limiter/limiter.go @@ -0,0 +1,245 @@ +// package limiter provides primatives for limiting the number of concurrent +// operations in-flight. +package limiter + +import ( + "context" + "errors" + "math/rand" + "sort" + "sync" + "sync/atomic" + + "golang.org/x/time/rate" +) + +// Unlimited can be used to allow an unlimited number of concurrent sessions. +const Unlimited uint32 = 0 + +// ErrCapacityReached is returned when there is no capacity for additional sessions. +var ErrCapacityReached = errors.New("active session limit reached") + +// SessionLimiter is a session-based concurrency limiter, it provides the basis +// of gRPC/xDS load balancing. +// +// Stream handlers obtain a session with BeginSession before they begin serving +// resources - if the server has reached capacity ErrCapacityReached is returned, +// otherwise a Session is returned. +// +// It is the session-holder's responsibility to: +// +// 1. Call End on the session when finished. +// 2. Receive on the session's Terminated channel and exit (e.g. close the gRPC +// stream) when it is closed. +// +// The maximum number of concurrent sessions is controlled with SetMaxSessions. +// If there are more than the given maximum sessions already in-flight, +// SessionLimiter will drain randomly-selected sessions at a rate controlled +// by SetDrainRateLimit. +type SessionLimiter struct { + drainLimiter *rate.Limiter + + // max and inFlight are read/written using atomic operations. + max, inFlight uint32 + + // wakeCh is used to trigger the Run loop to start draining excess sessions. + wakeCh chan struct{} + + // Everything below here is guarded by mu. + mu sync.Mutex + maxSessionID uint64 + sessionIDs []uint64 // sessionIDs must be sorted so we can binary search it. + sessions map[uint64]*session +} + +// NewSessionLimiter creates a new SessionLimiter. +func NewSessionLimiter() *SessionLimiter { + return &SessionLimiter{ + drainLimiter: rate.NewLimiter(rate.Inf, 1), + max: Unlimited, + wakeCh: make(chan struct{}, 1), + sessionIDs: make([]uint64, 0), + sessions: make(map[uint64]*session), + } +} + +// Run the SessionLimiter's drain loop, which terminates excess sessions if the +// limit is lowered. It will exit when the given context is canceled or reaches +// its deadline. +func (l *SessionLimiter) Run(ctx context.Context) { + for { + select { + case <-l.wakeCh: + for { + if !l.overCapacity() { + break + } + + if err := l.drainLimiter.Wait(ctx); err != nil { + break + } + + if !l.overCapacity() { + break + } + + l.terminateSession() + } + case <-ctx.Done(): + return + } + } +} + +// SetMaxSessions controls the maximum number of concurrent sessions. If it is +// lower, randomly-selected sessions will be drained. +func (l *SessionLimiter) SetMaxSessions(max uint32) { + atomic.StoreUint32(&l.max, max) + + // Send on wakeCh without blocking if the Run loop is busy. wakeCh has a + // buffer of 1, so no triggers will be missed. + select { + case l.wakeCh <- struct{}{}: + default: + } +} + +// SetDrainRateLimit controls the rate at which excess sessions will be drained. +func (l *SessionLimiter) SetDrainRateLimit(limit rate.Limit) { + l.drainLimiter.SetLimit(limit) +} + +// BeginSession begins a new session, or returns ErrCapacityReached if the +// concurrent session limit has been reached. +// +// It is the session-holder's responsibility to: +// +// 1. Call End on the session when finished. +// 2. Receive on the session's Terminated channel and exit (e.g. close the gRPC +// stream) when it is closed. +func (l *SessionLimiter) BeginSession() (Session, error) { + if !l.hasCapacity() { + return nil, ErrCapacityReached + } + + l.mu.Lock() + defer l.mu.Unlock() + return l.createSessionLocked(), nil +} + +// Note: hasCapacity is *best effort*. As we do not hold l.mu it's possible that: +// +// - max has changed by the time we compare it to inFlight. +// - inFlight < max now, but increases before we create a new session. +// +// This is acceptable for our uses, especially because excess sessions will +// eventually be drained. +func (l *SessionLimiter) hasCapacity() bool { + max := atomic.LoadUint32(&l.max) + if max == Unlimited { + return true + } + + cur := atomic.LoadUint32(&l.inFlight) + return max > cur +} + +// Note: overCapacity is *best effort*. As we do not hold l.mu it's possible that: +// +// - max has changed by the time we compare it to inFlight. +// - inFlight > max now, but decreases before we terminate a session. +func (l *SessionLimiter) overCapacity() bool { + max := atomic.LoadUint32(&l.max) + if max == Unlimited { + return false + } + + cur := atomic.LoadUint32(&l.inFlight) + return cur > max +} + +func (l *SessionLimiter) terminateSession() { + l.mu.Lock() + defer l.mu.Unlock() + + idx := rand.Intn(len(l.sessionIDs)) + id := l.sessionIDs[idx] + l.sessions[id].terminate() + l.deleteSessionLocked(idx, id) +} + +func (l *SessionLimiter) createSessionLocked() *session { + session := &session{ + l: l, + id: l.maxSessionID, + termCh: make(chan struct{}), + } + + l.maxSessionID++ + l.sessionIDs = append(l.sessionIDs, session.id) + l.sessions[session.id] = session + + atomic.AddUint32(&l.inFlight, 1) + + return session +} + +func (l *SessionLimiter) deleteSessionLocked(idx int, id uint64) { + delete(l.sessions, id) + + // Note: it's important that we preserve the order here (which most allocation + // free deletion tricks don't) because we binary search the slice. + l.sessionIDs = append(l.sessionIDs[:idx], l.sessionIDs[idx+1:]...) + + atomic.AddUint32(&l.inFlight, ^uint32(0)) +} + +func (l *SessionLimiter) deleteSessionWithID(id uint64) { + l.mu.Lock() + defer l.mu.Unlock() + + idx := sort.Search(len(l.sessionIDs), func(i int) bool { + return l.sessionIDs[i] >= id + }) + + if idx == len(l.sessionIDs) || l.sessionIDs[idx] != id { + // It's possible that we weren't able to find the id because the session has + // already been deleted. This could be because the session-holder called End + // more than once, or because the session was drained. In either case there's + // nothing more to do. + return + } + + l.deleteSessionLocked(idx, id) +} + +// Session allows its holder to perform an operation (e.g. serve a gRPC stream) +// concurrenly with other session-holders. Sessions may be terminated abruptly +// by the SessionLimiter, so it is the responsibility of the holder to receive +// on the Terminated channel and halt the operation when it is closed. +type Session interface { + // End the session. + // + // This MUST be called when the session-holder is done (e.g. the gRPC stream + // is closed). + End() + + // Terminated is a channel that is closed when the session is terminated. + // + // The session-holder MUST receive on it and exit (e.g. close the gRPC stream) + // when it is closed. + Terminated() <-chan struct{} +} + +type session struct { + l *SessionLimiter + + id uint64 + termCh chan struct{} +} + +func (s *session) End() { s.l.deleteSessionWithID(s.id) } + +func (s *session) Terminated() <-chan struct{} { return s.termCh } + +func (s *session) terminate() { close(s.termCh) } diff --git a/agent/grpc-external/limiter/limiter_test.go b/agent/grpc-external/limiter/limiter_test.go new file mode 100644 index 0000000000..cef6a4d417 --- /dev/null +++ b/agent/grpc-external/limiter/limiter_test.go @@ -0,0 +1,81 @@ +package limiter + +import ( + "context" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/lib" +) + +func init() { lib.SeedMathRand() } + +func TestSessionLimiter(t *testing.T) { + lim := NewSessionLimiter() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go lim.Run(ctx) + + // doneCh is used to shut the goroutines down at the end of the test. + doneCh := make(chan struct{}) + t.Cleanup(func() { close(doneCh) }) + + // Start 10 sessions, and increment the counter when they are terminated. + var ( + terminations uint32 + wg sync.WaitGroup + ) + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + sess, err := lim.BeginSession() + require.NoError(t, err) + defer sess.End() + + wg.Done() + + select { + case <-sess.Terminated(): + atomic.AddUint32(&terminations, 1) + case <-doneCh: + } + }() + } + + // Wait for all the sessions to begin. + wg.Wait() + + // Lowering max sessions to 5 should result in 5 sessions being terminated. + lim.SetMaxSessions(5) + require.Eventually(t, func() bool { + return atomic.LoadUint32(&terminations) == 5 + }, 2*time.Second, 50*time.Millisecond) + + // Attempting to start a new session should fail immediately. + _, err := lim.BeginSession() + require.Equal(t, ErrCapacityReached, err) + + // Raising MaxSessions should make room for a new session. + lim.SetMaxSessions(6) + sess, err := lim.BeginSession() + require.NoError(t, err) + + // ...but trying to start another new one should fail + _, err = lim.BeginSession() + require.Equal(t, ErrCapacityReached, err) + + // ...until another session ends. + sess.End() + _, err = lim.BeginSession() + require.NoError(t, err) + + // Calling End twice is a no-op. + sess.End() + _, err = lim.BeginSession() + require.Equal(t, ErrCapacityReached, err) +} diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index 6b26db7d04..bc024291cb 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -23,6 +23,7 @@ import ( "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" external "github.com/hashicorp/consul/agent/grpc-external" + "github.com/hashicorp/consul/agent/grpc-external/limiter" grpc "github.com/hashicorp/consul/agent/grpc-internal" "github.com/hashicorp/consul/agent/grpc-internal/resolver" "github.com/hashicorp/consul/agent/pool" @@ -1446,6 +1447,7 @@ func newDefaultDeps(t *testing.T, c *consul.Config) consul.Deps { EnterpriseDeps: newDefaultDepsEnterprise(t, logger, c), NewRequestRecorderFunc: middleware.NewRequestRecorder, GetNetRPCInterceptorFunc: middleware.GetNetRPCInterceptor, + XDSStreamLimiter: limiter.NewSessionLimiter(), } } diff --git a/agent/setup.go b/agent/setup.go index c329605181..25353e1ac9 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -18,6 +18,8 @@ import ( "github.com/hashicorp/consul/agent/consul/fsm" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/usagemetrics" + "github.com/hashicorp/consul/agent/consul/xdscapacity" + "github.com/hashicorp/consul/agent/grpc-external/limiter" grpc "github.com/hashicorp/consul/agent/grpc-internal" "github.com/hashicorp/consul/agent/grpc-internal/resolver" "github.com/hashicorp/consul/agent/local" @@ -150,6 +152,8 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) d.EventPublisher = stream.NewEventPublisher(10 * time.Second) + d.XDSStreamLimiter = limiter.NewSessionLimiter() + return d, nil } @@ -232,7 +236,9 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau gauges = append(gauges, consul.AutopilotGauges, consul.LeaderCertExpirationGauges, - consul.LeaderPeeringMetrics) + consul.LeaderPeeringMetrics, + xdscapacity.StatsGauges, + ) } // Flatten definitions @@ -275,6 +281,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau consul.RPCCounters, grpc.StatsCounters, local.StateCounters, + xds.StatsCounters, raftCounters, } // Flatten definitions diff --git a/agent/xds/delta.go b/agent/xds/delta.go index 71c1edcb0f..aa038214c2 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/armon/go-metrics" envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" @@ -29,6 +30,8 @@ import ( "github.com/hashicorp/consul/logging" ) +var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another") + type deltaRecvResponse int const ( @@ -86,6 +89,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove return err } + session, err := s.SessionLimiter.BeginSession() + if err != nil { + return errOverwhelmed + } + defer session.End() + // Loop state var ( cfgSnap *proxycfg.ConfigSnapshot @@ -159,6 +168,10 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove for { select { + case <-session.Terminated(): + generator.Logger.Debug("draining stream to rebalance load") + metrics.IncrCounter([]string{"xds", "server", "streamDrained"}, 1) + return errOverwhelmed case <-authTimer: // It's been too long since a Discovery{Request,Response} so recheck ACLs. if err := checkStreamACLs(cfgSnap); err != nil { diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 21ec701dca..c2706c768d 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -17,6 +17,7 @@ import ( "google.golang.org/grpc/status" "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/xdscommon" @@ -36,7 +37,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, serverlessPluginEnabled) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, serverlessPluginEnabled, nil) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -238,7 +239,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -369,7 +370,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -522,7 +523,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy // This mutateFn causes any endpoint with a name containing "geo-cache" to be @@ -663,7 +664,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -799,7 +800,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -1059,7 +1060,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) { return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil) } - scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, false) + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, false, nil) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("web-sidecar-proxy", nil) @@ -1137,6 +1138,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token, 100*time.Millisecond, // Make this short. false, + nil, ) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy @@ -1236,6 +1238,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token, 100*time.Millisecond, // Make this short. false, + nil, ) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy @@ -1316,7 +1319,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) { // Allow all return acl.RootAuthorizer("manage"), nil } - scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, false) + scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, false, nil) mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy sid := structs.NewServiceID("ingress-gateway", nil) @@ -1368,6 +1371,115 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) { } } +func TestServer_DeltaAggregatedResources_v3_CapacityReached(t *testing.T) { + aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil } + + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, capacityReachedLimiter{}) + mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy + + sid := structs.NewServiceID("web-sidecar-proxy", nil) + + mgr.RegisterProxy(t, sid) + + snap := newTestSnapshot(t, nil, "") + + envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + InitialResourceVersions: mustMakeVersionMap(t, + makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + + select { + case err := <-errCh: + require.Error(t, err) + require.Equal(t, codes.ResourceExhausted.String(), status.Code(err).String()) + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } +} + +type capacityReachedLimiter struct{} + +func (capacityReachedLimiter) BeginSession() (limiter.Session, error) { + return nil, limiter.ErrCapacityReached +} + +func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) { + limiter := &testLimiter{} + + aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil } + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, limiter) + mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy + + sid := structs.NewServiceID("web-sidecar-proxy", nil) + + mgr.RegisterProxy(t, sid) + + testutil.RunStep(t, "successful request/response", func(t *testing.T) { + snap := newTestSnapshot(t, nil, "") + + envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + InitialResourceVersions: mustMakeVersionMap(t, + makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + + mgr.DeliverConfig(t, sid, snap) + + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: xdscommon.ClusterType, + Nonce: hexString(1), + Resources: makeTestResources(t, + makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, snap, "tcp:db"), + ), + }) + }) + + testutil.RunStep(t, "terminate limiter session", func(t *testing.T) { + limiter.TerminateSession() + + select { + case err := <-errCh: + require.Error(t, err) + require.Equal(t, codes.ResourceExhausted.String(), status.Code(err).String()) + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } + }) + + testutil.RunStep(t, "check drain counter incremeted", func(t *testing.T) { + data := scenario.sink.Data() + require.Len(t, data, 1) + + item := data[0] + require.Len(t, item.Counters, 1) + + val, ok := item.Counters["consul.xds.test.xds.server.streamDrained"] + require.True(t, ok) + require.Equal(t, 1, val.Count) + }) +} + +type testLimiter struct { + termCh chan struct{} +} + +func (t *testLimiter) BeginSession() (limiter.Session, error) { + t.termCh = make(chan struct{}) + return &testSession{termCh: t.termCh}, nil +} + +func (t *testLimiter) TerminateSession() { close(t.termCh) } + +type testSession struct { + termCh chan struct{} +} + +func (t *testSession) Terminated() <-chan struct{} { return t.termCh } + +func (*testSession) End() {} + func assertDeltaChanBlocked(t *testing.T, ch chan *envoy_discovery_v3.DeltaDiscoveryResponse) { t.Helper() select { diff --git a/agent/xds/server.go b/agent/xds/server.go index 3ee42e77b0..74f386fc54 100644 --- a/agent/xds/server.go +++ b/agent/xds/server.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/consul/acl" external "github.com/hashicorp/consul/agent/grpc-external" + "github.com/hashicorp/consul/agent/grpc-external/limiter" "github.com/hashicorp/consul/agent/proxycfg" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/xds/xdscommon" @@ -29,6 +30,13 @@ var StatsGauges = []prometheus.GaugeDefinition{ }, } +var StatsCounters = []prometheus.CounterDefinition{ + { + Name: []string{"xds", "server", "streamDrained"}, + Help: "Counts the number of xDS streams that are drained when rebalancing the load between servers.", + }, +} + // ADSStream is a shorter way of referring to this thing... type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer @@ -97,17 +105,24 @@ type ProxyConfigSource interface { Watch(id structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error) } +// SessionLimiter is the interface exposed by limiter.SessionLimiter. We depend +// on an interface rather than the concrete type so we can mock it in tests. +type SessionLimiter interface { + BeginSession() (limiter.Session, error) +} + // Server represents a gRPC server that can handle xDS requests from Envoy. All // of it's public members must be set before the gRPC server is started. // // A full description of the XDS protocol can be found at // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol type Server struct { - NodeName string - Logger hclog.Logger - CfgSrc ProxyConfigSource - ResolveToken ACLResolverFunc - CfgFetcher ConfigFetcher + NodeName string + Logger hclog.Logger + CfgSrc ProxyConfigSource + ResolveToken ACLResolverFunc + CfgFetcher ConfigFetcher + SessionLimiter SessionLimiter // AuthCheckFrequency is how often we should re-check the credentials used // during a long-lived gRPC Stream after it has been initially established. @@ -159,6 +174,7 @@ func NewServer( cfgMgr ProxyConfigSource, resolveToken ACLResolverFunc, cfgFetcher ConfigFetcher, + limiter SessionLimiter, ) *Server { return &Server{ NodeName: nodeName, @@ -166,6 +182,7 @@ func NewServer( CfgSrc: cfgMgr, ResolveToken: resolveToken, CfgFetcher: cfgFetcher, + SessionLimiter: limiter, AuthCheckFrequency: DefaultAuthCheckFrequency, activeStreams: &activeStreamCounters{}, serverlessPluginEnabled: serverlessPluginEnabled, diff --git a/agent/xds/xds_protocol_helpers_test.go b/agent/xds/xds_protocol_helpers_test.go index a27eaba2f2..7cb413defb 100644 --- a/agent/xds/xds_protocol_helpers_test.go +++ b/agent/xds/xds_protocol_helpers_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/grpc-external/limiter" envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -136,6 +137,7 @@ func newTestServerDeltaScenario( token string, authCheckFrequency time.Duration, serverlessPluginEnabled bool, + sessionLimiter SessionLimiter, ) *testServerScenario { mgr := newTestManager(t) envoy := NewTestEnvoy(t, proxyID, token) @@ -154,6 +156,10 @@ func newTestServerDeltaScenario( metrics.NewGlobal(cfg, sink) }) + if sessionLimiter == nil { + sessionLimiter = limiter.NewSessionLimiter() + } + s := NewServer( "node-123", testutil.Logger(t), @@ -161,6 +167,7 @@ func newTestServerDeltaScenario( mgr, resolveToken, nil, /*cfgFetcher ConfigFetcher*/ + sessionLimiter, ) if authCheckFrequency > 0 { s.AuthCheckFrequency = authCheckFrequency diff --git a/logging/names.go b/logging/names.go index 7f5e2bf603..5d1f60c9bc 100644 --- a/logging/names.go +++ b/logging/names.go @@ -1,66 +1,67 @@ package logging const ( - ACL string = "acl" - Agent string = "agent" - AntiEntropy string = "anti_entropy" - AutoEncrypt string = "auto_encrypt" - AutoConfig string = "auto_config" - Autopilot string = "autopilot" - AWS string = "aws" - Azure string = "azure" - CA string = "ca" - Catalog string = "catalog" - CentralConfig string = "central_config" - ConfigEntry string = "config_entry" - Connect string = "connect" - Consul string = "consul" - ConsulClient string = "client" - ConsulServer string = "server" - Coordinate string = "coordinate" - DNS string = "dns" - Envoy string = "envoy" - FederationState string = "federation_state" - FSM string = "fsm" - GatewayLocator string = "gateway_locator" - HTTP string = "http" - IngressGateway string = "ingress_gateway" - Intentions string = "intentions" - Internal string = "internal" - KV string = "kvs" - LAN string = "lan" - Leader string = "leader" - Legacy string = "legacy" - License string = "license" - Manager string = "manager" - Memberlist string = "memberlist" - MeshGateway string = "mesh_gateway" - Namespace string = "namespace" - NetworkAreas string = "network_areas" - Operator string = "operator" - PreparedQuery string = "prepared_query" - Proxy string = "proxy" - ProxyConfig string = "proxycfg" - Raft string = "raft" - Replication string = "replication" - Router string = "router" - RPC string = "rpc" - Serf string = "serf" - Session string = "session" - Sentinel string = "sentinel" - Snapshot string = "snapshot" - Partition string = "partition" - Peering string = "peering" - PeeringMetrics string = "peering_metrics" - TerminatingGateway string = "terminating_gateway" - TLSUtil string = "tlsutil" - Transaction string = "txn" - UsageMetrics string = "usage_metrics" - UIServer string = "ui_server" - UIMetricsProxy string = "ui_metrics_proxy" - WAN string = "wan" - Watch string = "watch" - XDS string = "xds" - Vault string = "vault" - Health string = "health" + ACL string = "acl" + Agent string = "agent" + AntiEntropy string = "anti_entropy" + AutoEncrypt string = "auto_encrypt" + AutoConfig string = "auto_config" + Autopilot string = "autopilot" + AWS string = "aws" + Azure string = "azure" + CA string = "ca" + Catalog string = "catalog" + CentralConfig string = "central_config" + ConfigEntry string = "config_entry" + Connect string = "connect" + Consul string = "consul" + ConsulClient string = "client" + ConsulServer string = "server" + Coordinate string = "coordinate" + DNS string = "dns" + Envoy string = "envoy" + FederationState string = "federation_state" + FSM string = "fsm" + GatewayLocator string = "gateway_locator" + HTTP string = "http" + IngressGateway string = "ingress_gateway" + Intentions string = "intentions" + Internal string = "internal" + KV string = "kvs" + LAN string = "lan" + Leader string = "leader" + Legacy string = "legacy" + License string = "license" + Manager string = "manager" + Memberlist string = "memberlist" + MeshGateway string = "mesh_gateway" + Namespace string = "namespace" + NetworkAreas string = "network_areas" + Operator string = "operator" + PreparedQuery string = "prepared_query" + Proxy string = "proxy" + ProxyConfig string = "proxycfg" + Raft string = "raft" + Replication string = "replication" + Router string = "router" + RPC string = "rpc" + Serf string = "serf" + Session string = "session" + Sentinel string = "sentinel" + Snapshot string = "snapshot" + Partition string = "partition" + Peering string = "peering" + PeeringMetrics string = "peering_metrics" + TerminatingGateway string = "terminating_gateway" + TLSUtil string = "tlsutil" + Transaction string = "txn" + UsageMetrics string = "usage_metrics" + UIServer string = "ui_server" + UIMetricsProxy string = "ui_metrics_proxy" + WAN string = "wan" + Watch string = "watch" + XDS string = "xds" + XDSCapacityController string = "xds_capacity_controller" + Vault string = "vault" + Health string = "health" ) diff --git a/website/content/docs/agent/telemetry.mdx b/website/content/docs/agent/telemetry.mdx index 197900a2ff..1608d93212 100644 --- a/website/content/docs/agent/telemetry.mdx +++ b/website/content/docs/agent/telemetry.mdx @@ -542,6 +542,8 @@ These metrics are used to monitor the health of the Consul servers. | `consul.grpc.server.stream.count` | Counts the number of new gRPC streams received by the server. | streams | counter | | `consul.grpc.server.streams` | Measures the number of active gRPC streams handled by the server. | streams | gauge | | `consul.xds.server.streams` | Measures the number of active xDS streams handled by the server split by protocol version. | streams | gauge | +| `consul.xds.server.idealStreamsMax` | The maximum number of xDS streams per server, chosen to achieve a roughly even spread of load across servers. | streams | gauge | +| `consul.xds.server.streamDrained` | Counts the number of xDS streams that are drained when rebalancing the load between servers. | streams | counter | ## Server Workload