xDS Load Balancing (#14397)

Prior to #13244, connect proxies and gateways could only be configured by an
xDS session served by the local client agent.

In an upcoming release, it will be possible to deploy a Consul service mesh
without client agents. In this model, xDS sessions will be handled by the
servers themselves, which necessitates load-balancing to prevent a single
server from receiving a disproportionate amount of load and becoming
overwhelmed.

This introduces a simple form of load-balancing where Consul will attempt to
achieve an even spread of load (xDS sessions) between all healthy servers.
It does so by implementing a concurrent session limiter (limiter.SessionLimiter)
and adjusting the limit according to autopilot state and proxy service
registrations in the catalog.

If a server is already over capacity (i.e. the session limit is lowered),
Consul will begin draining sessions to rebalance the load. This will result
in the client receiving a `RESOURCE_EXHAUSTED` status code. It is the client's
responsibility to observe this response and reconnect to a different server.

Users of the gRPC client connection brokered by the
consul-server-connection-manager library will get this for free.

The rate at which Consul will drain sessions to rebalance load is scaled
dynamically based on the number of proxies in the catalog.
This commit is contained in:
Dan Upton 2022-09-09 15:02:01 +01:00 committed by GitHub
parent deda99a387
commit 1c2c975b0b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1008 additions and 125 deletions

3
.changelog/14397.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
xds: servers will limit the number of concurrent xDS streams they can handle to balance the load across all servers
```

View File

@ -707,6 +707,9 @@ func (a *Agent) Start(ctx context.Context) error {
return err
}
// Start a goroutine to terminate excess xDS sessions.
go a.baseDeps.XDSStreamLimiter.Run(&lib.StopChannelContext{StopCh: a.shutdownCh})
// register watches
if err := a.reloadWatches(a.config); err != nil {
return err
@ -791,6 +794,7 @@ func (a *Agent) listenAndServeGRPC() error {
return a.delegate.ResolveTokenAndDefaultMeta(id, nil, nil)
},
a,
a.baseDeps.XDSStreamLimiter,
)
a.xdsServer.Register(a.externalGRPCServer)

View File

@ -55,6 +55,14 @@ func (d *AutopilotDelegate) NotifyState(state *autopilot.State) {
}
d.readyServersPublisher.PublishReadyServersEvents(state)
var readyServers uint32
for _, server := range state.Servers {
if autopilotevents.IsServerReady(server) {
readyServers++
}
}
d.server.xdsCapacityController.SetServerCount(readyServers)
}
func (d *AutopilotDelegate) RemoveFailedServer(srv *autopilot.Server) {

View File

@ -198,13 +198,11 @@ func (r *ReadyServersEventPublisher) readyServersEvents(state *autopilot.State)
return []stream.Event{r.newReadyServersEvent(servers)}, true
}
// autopilotStateToReadyServers will iterate through all servers in the autopilot
// state and compile a list of servers which are "ready". Readiness means that
// they would be an acceptable target for stale queries.
func (r *ReadyServersEventPublisher) autopilotStateToReadyServers(state *autopilot.State) EventPayloadReadyServers {
var servers EventPayloadReadyServers
for _, srv := range state.Servers {
// All healthy servers are caught up enough to be included in a ready servers.
// IsServerReady determines whether the given server (from the autopilot state)
// is "ready" - by which we mean that they would be an acceptable target for
// stale queries.
func IsServerReady(srv *autopilot.ServerState) bool {
// All healthy servers are caught up enough to be considered ready.
// Servers with voting rights that are still healthy according to Serf are
// also included as they have likely just fallen behind the leader a little
// after initially replicating state. They are still acceptable targets
@ -216,7 +214,16 @@ func (r *ReadyServersEventPublisher) autopilotStateToReadyServers(state *autopil
// TODO (agentless) for a non-voting server that is still alive but fell
// behind, should we cause it to be removed. For voters we know they were caught
// up at some point but for non-voters we cannot know the same thing.
if srv.Health.Healthy || (srv.HasVotingRights() && srv.Server.NodeStatus == autopilot.NodeAlive) {
return srv.Health.Healthy || (srv.HasVotingRights() && srv.Server.NodeStatus == autopilot.NodeAlive)
}
// autopilotStateToReadyServers will iterate through all servers in the autopilot
// state and compile a list of servers which are "ready". Readiness means that
// they would be an acceptable target for stale queries.
func (r *ReadyServersEventPublisher) autopilotStateToReadyServers(state *autopilot.State) EventPayloadReadyServers {
var servers EventPayloadReadyServers
for _, srv := range state.Servers {
if IsServerReady(srv) {
// autopilot information contains addresses in the <host>:<port> form. We only care about the
// the host so we parse it out here and discard the port.
host, err := extractHost(string(srv.Server.Address))

View File

@ -18,6 +18,7 @@ import (
msgpackrpc "github.com/hashicorp/consul-net-rpc/net-rpc-msgpackrpc"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
grpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
"github.com/hashicorp/consul/agent/pool"
@ -553,6 +554,7 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
NewRequestRecorderFunc: middleware.NewRequestRecorder,
GetNetRPCInterceptorFunc: middleware.GetNetRPCInterceptor,
EnterpriseDeps: newDefaultDepsEnterprise(t, logger, c),
XDSStreamLimiter: limiter.NewSessionLimiter(),
}
}

View File

@ -7,6 +7,7 @@ import (
"github.com/hashicorp/consul-net-rpc/net/rpc"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/rpc/middleware"
@ -23,6 +24,7 @@ type Deps struct {
ConnPool *pool.ConnPool
GRPCConnPool GRPCClientConner
LeaderForwarder LeaderForwarder
XDSStreamLimiter *limiter.SessionLimiter
// GetNetRPCInterceptorFunc, if not nil, sets the net/rpc rpc.ServerServiceCallInterceptor on
// the server side to record metrics around the RPC requests. If nil, no interceptor is added to
// the rpc server.

View File

@ -39,6 +39,7 @@ import (
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/consul/wanfed"
"github.com/hashicorp/consul/agent/consul/xdscapacity"
aclgrpc "github.com/hashicorp/consul/agent/grpc-external/services/acl"
"github.com/hashicorp/consul/agent/grpc-external/services/connectca"
"github.com/hashicorp/consul/agent/grpc-external/services/dataplane"
@ -374,6 +375,10 @@ type Server struct {
// peeringServer handles peering RPC requests internal to this cluster, like generating peering tokens.
peeringServer *peering.Server
// xdsCapacityController controls the number of concurrent xDS streams the
// server is able to handle.
xdsCapacityController *xdscapacity.Controller
// embedded struct to hold all the enterprise specific data
EnterpriseServer
}
@ -749,6 +754,13 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server) (*Ser
s.grpcLeaderForwarder = flat.LeaderForwarder
go s.trackLeaderChanges()
s.xdsCapacityController = xdscapacity.NewController(xdscapacity.Config{
Logger: s.logger.Named(logging.XDSCapacityController),
GetStore: func() xdscapacity.Store { return s.fsm.State() },
SessionLimiter: flat.XDSStreamLimiter,
})
go s.xdsCapacityController.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
// Initialize Autopilot. This must happen before starting leadership monitoring
// as establishing leadership could attempt to use autopilot and cause a panic.
s.initAutopilot(config)

View File

@ -325,7 +325,7 @@ func (s *Store) NodeUsage() (uint64, NodeUsage, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
nodes, err := firstUsageEntry(tx, tableNodes)
nodes, err := firstUsageEntry(nil, tx, tableNodes)
if err != nil {
return 0, NodeUsage{}, fmt.Errorf("failed nodes lookup: %s", err)
}
@ -347,7 +347,7 @@ func (s *Store) PeeringUsage() (uint64, PeeringUsage, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
peerings, err := firstUsageEntry(tx, tablePeering)
peerings, err := firstUsageEntry(nil, tx, tablePeering)
if err != nil {
return 0, PeeringUsage{}, fmt.Errorf("failed peerings lookup: %s", err)
}
@ -365,23 +365,23 @@ func (s *Store) PeeringUsage() (uint64, PeeringUsage, error) {
// ServiceUsage returns the latest seen Raft index, a compiled set of service
// usage data, and any errors.
func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
func (s *Store) ServiceUsage(ws memdb.WatchSet) (uint64, ServiceUsage, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
serviceInstances, err := firstUsageEntry(tx, tableServices)
serviceInstances, err := firstUsageEntry(ws, tx, tableServices)
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}
services, err := firstUsageEntry(tx, serviceNamesUsageTable)
services, err := firstUsageEntry(ws, tx, serviceNamesUsageTable)
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}
serviceKindInstances := make(map[string]int)
for _, kind := range allConnectKind {
usage, err := firstUsageEntry(tx, connectUsageTableName(kind))
usage, err := firstUsageEntry(ws, tx, connectUsageTableName(kind))
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}
@ -393,7 +393,7 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
Services: services.Count,
ConnectServiceInstances: serviceKindInstances,
}
results, err := compileEnterpriseServiceUsage(tx, usage)
results, err := compileEnterpriseServiceUsage(ws, tx, usage)
if err != nil {
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
}
@ -405,7 +405,7 @@ func (s *Store) KVUsage() (uint64, KVUsage, error) {
tx := s.db.ReadTxn()
defer tx.Abort()
kvs, err := firstUsageEntry(tx, "kvs")
kvs, err := firstUsageEntry(nil, tx, "kvs")
if err != nil {
return 0, KVUsage{}, fmt.Errorf("failed kvs lookup: %s", err)
}
@ -428,7 +428,7 @@ func (s *Store) ConfigEntryUsage() (uint64, ConfigEntryUsage, error) {
configEntries := make(map[string]int)
var maxIdx uint64
for _, kind := range structs.AllConfigEntryKinds {
configEntry, err := firstUsageEntry(tx, configEntryUsageTableName(kind))
configEntry, err := firstUsageEntry(nil, tx, configEntryUsageTableName(kind))
if configEntry.Index > maxIdx {
maxIdx = configEntry.Index
}
@ -448,11 +448,12 @@ func (s *Store) ConfigEntryUsage() (uint64, ConfigEntryUsage, error) {
return maxIdx, results, nil
}
func firstUsageEntry(tx ReadTxn, id string) (*UsageEntry, error) {
usage, err := tx.First(tableUsage, indexID, id)
func firstUsageEntry(ws memdb.WatchSet, tx ReadTxn, id string) (*UsageEntry, error) {
watch, usage, err := tx.FirstWatch(tableUsage, indexID, id)
if err != nil {
return nil, err
}
ws.Add(watch)
// If no elements have been inserted, the usage entry will not exist. We
// return a valid value so that can be certain the return value is not nil

View File

@ -29,7 +29,7 @@ func addEnterpriseKVUsage(map[string]int, memdb.Change) {}
func addEnterpriseConfigEntryUsage(map[string]int, memdb.Change) {}
func compileEnterpriseServiceUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) {
func compileEnterpriseServiceUsage(ws memdb.WatchSet, tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) {
return usage, nil
}

View File

@ -1,7 +1,9 @@
package state
import (
"context"
"testing"
"time"
memdb "github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
@ -150,7 +152,7 @@ func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) {
s := testStateStore(t)
// No services have been registered, and thus no usage entry exists
idx, usage, err := s.ServiceUsage()
idx, usage, err := s.ServiceUsage(nil)
require.NoError(t, err)
require.Equal(t, idx, uint64(0))
require.Equal(t, usage.Services, 0)
@ -174,13 +176,22 @@ func TestStateStore_Usage_ServiceUsage(t *testing.T) {
testRegisterConnectNativeService(t, s, 14, "node2", "service-native")
testRegisterConnectNativeService(t, s, 15, "node2", "service-native-1")
idx, usage, err := s.ServiceUsage()
ws := memdb.NewWatchSet()
idx, usage, err := s.ServiceUsage(ws)
require.NoError(t, err)
require.Equal(t, idx, uint64(15))
require.Equal(t, 5, usage.Services)
require.Equal(t, 8, usage.ServiceInstances)
require.Equal(t, 2, usage.ConnectServiceInstances[string(structs.ServiceKindConnectProxy)])
require.Equal(t, 3, usage.ConnectServiceInstances[connectNativeInstancesTable])
testRegisterSidecarProxy(t, s, 16, "node2", "service2")
select {
case <-ws.WatchCh(context.Background()):
case <-time.After(100 * time.Millisecond):
t.Fatal("timeout waiting on WatchSet")
}
}
func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
@ -207,7 +218,7 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
testRegisterSidecarProxy(t, s, 3, "node1", "service2")
testRegisterConnectNativeService(t, s, 4, "node1", "service-connect")
idx, usage, err := s.ServiceUsage()
idx, usage, err := s.ServiceUsage(nil)
require.NoError(t, err)
require.Equal(t, idx, uint64(4))
require.Equal(t, 3, usage.Services)
@ -217,7 +228,7 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
require.NoError(t, s.DeleteNode(4, "node1", nil, ""))
idx, usage, err = s.ServiceUsage()
idx, usage, err = s.ServiceUsage(nil)
require.NoError(t, err)
require.Equal(t, idx, uint64(4))
require.Equal(t, usage.Services, 0)
@ -245,7 +256,7 @@ func TestStateStore_Usage_ServiceUsagePeering(t *testing.T) {
testRegisterConnectNativeService(t, s, 7, "node2", "service-native")
testutil.RunStep(t, "writes", func(t *testing.T) {
idx, usage, err := s.ServiceUsage()
idx, usage, err := s.ServiceUsage(nil)
require.NoError(t, err)
require.Equal(t, uint64(7), idx)
require.Equal(t, 3, usage.Services)
@ -257,7 +268,7 @@ func TestStateStore_Usage_ServiceUsagePeering(t *testing.T) {
testutil.RunStep(t, "deletes", func(t *testing.T) {
require.NoError(t, s.DeleteNode(7, "node1", nil, peerName))
require.NoError(t, s.DeleteNode(8, "node2", nil, ""))
idx, usage, err := s.ServiceUsage()
idx, usage, err := s.ServiceUsage(nil)
require.NoError(t, err)
require.Equal(t, uint64(8), idx)
require.Equal(t, 0, usage.Services)
@ -295,7 +306,7 @@ func TestStateStore_Usage_Restore(t *testing.T) {
require.Equal(t, idx, uint64(9))
require.Equal(t, nodeUsage.Nodes, 1)
idx, usage, err := s.ServiceUsage()
idx, usage, err := s.ServiceUsage(nil)
require.NoError(t, err)
require.Equal(t, idx, uint64(9))
require.Equal(t, usage.Services, 1)
@ -395,7 +406,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
require.NoError(t, s.EnsureService(2, "node1", svc))
// We renamed a service with a single instance, so we maintain 1 service.
idx, usage, err := s.ServiceUsage()
idx, usage, err := s.ServiceUsage(nil)
require.NoError(t, err)
require.Equal(t, idx, uint64(2))
require.Equal(t, usage.Services, 1)
@ -415,7 +426,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
require.NoError(t, s.EnsureService(3, "node1", svc))
// We renamed a service with a single instance, so we maintain 1 service.
idx, usage, err := s.ServiceUsage()
idx, usage, err := s.ServiceUsage(nil)
require.NoError(t, err)
require.Equal(t, idx, uint64(3))
require.Equal(t, usage.Services, 1)
@ -436,7 +447,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
require.NoError(t, s.EnsureService(4, "node1", svc))
// We renamed a service with a single instance, so we maintain 1 service.
idx, usage, err := s.ServiceUsage()
idx, usage, err := s.ServiceUsage(nil)
require.NoError(t, err)
require.Equal(t, idx, uint64(4))
require.Equal(t, usage.Services, 1)
@ -467,7 +478,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
}
require.NoError(t, s.EnsureService(6, "node1", svc3))
idx, usage, err := s.ServiceUsage()
idx, usage, err := s.ServiceUsage(nil)
require.NoError(t, err)
require.Equal(t, idx, uint64(6))
require.Equal(t, usage.Services, 2)
@ -485,7 +496,7 @@ func TestStateStore_Usage_ServiceUsage_updatingService(t *testing.T) {
}
require.NoError(t, s.EnsureService(7, "node1", update))
idx, usage, err = s.ServiceUsage()
idx, usage, err = s.ServiceUsage(nil)
require.NoError(t, err)
require.Equal(t, idx, uint64(7))
require.Equal(t, usage.Services, 3)
@ -511,7 +522,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
require.NoError(t, s.EnsureService(2, "node1", svc))
// We renamed a service with a single instance, so we maintain 1 service.
idx, usage, err := s.ServiceUsage()
idx, usage, err := s.ServiceUsage(nil)
require.NoError(t, err)
require.Equal(t, idx, uint64(2))
require.Equal(t, usage.Services, 1)
@ -537,7 +548,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
}
require.NoError(t, s.EnsureService(4, "node1", svc3))
idx, usage, err := s.ServiceUsage()
idx, usage, err := s.ServiceUsage(nil)
require.NoError(t, err)
require.Equal(t, idx, uint64(4))
require.Equal(t, usage.Services, 2)
@ -552,7 +563,7 @@ func TestStateStore_Usage_ServiceUsage_updatingConnectProxy(t *testing.T) {
}
require.NoError(t, s.EnsureService(5, "node1", update))
idx, usage, err = s.ServiceUsage()
idx, usage, err = s.ServiceUsage(nil)
require.NoError(t, err)
require.Equal(t, idx, uint64(5))
require.Equal(t, usage.Services, 3)

View File

@ -178,7 +178,7 @@ func (u *UsageMetricsReporter) runOnce() {
u.emitPeeringUsage(peeringUsage)
_, serviceUsage, err := state.ServiceUsage()
_, serviceUsage, err := state.ServiceUsage(nil)
if err != nil {
u.logger.Warn("failed to retrieve services from state store", "error", err)
}

View File

@ -0,0 +1,210 @@
package xdscapacity
import (
"context"
"math"
"time"
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/retry"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"golang.org/x/time/rate"
)
var StatsGauges = []prometheus.GaugeDefinition{
{
Name: []string{"xds", "server", "idealStreamsMax"},
Help: "The maximum number of xDS streams per server, chosen to achieve a roughly even spread of load across servers.",
},
}
// errorMargin is amount to which we allow a server to be over-occupied,
// expressed as a percentage (between 0 and 1).
//
// We allow 10% more than the ideal number of streams per server.
const errorMargin = 0.1
// Controller determines the ideal number of xDS streams for the server to
// handle and enforces it using the given SessionLimiter.
//
// We aim for a roughly even spread of streams between servers in the cluster
// and, to that end, limit the number of streams each server can handle to:
//
// (<number of proxies> / <number of healthy servers>) + <error margin>
//
// Controller receives changes to the number of healthy servers from the
// autopilot delegate. It queries the state store's catalog tables to discover
// the number of registered proxy (sidecar and gateway) services.
type Controller struct {
cfg Config
serverCh chan uint32
doneCh chan struct{}
prevMaxSessions uint32
prevRateLimit rate.Limit
}
// Config contains the dependencies for Controller.
type Config struct {
Logger hclog.Logger
GetStore func() Store
SessionLimiter SessionLimiter
}
// SessionLimiter is used to enforce the session limit to achieve the ideal
// spread of xDS streams between servers.
type SessionLimiter interface {
SetMaxSessions(maxSessions uint32)
SetDrainRateLimit(rateLimit rate.Limit)
}
// NewController creates a new capacity controller with the given config.
//
// Call Run to start the control-loop.
func NewController(cfg Config) *Controller {
return &Controller{
cfg: cfg,
serverCh: make(chan uint32),
doneCh: make(chan struct{}),
}
}
// Run the control-loop until the given context is canceled or reaches its
// deadline.
func (c *Controller) Run(ctx context.Context) {
defer close(c.doneCh)
ws, numProxies, err := c.countProxies(ctx)
if err != nil {
return
}
var numServers uint32
for {
select {
case s := <-c.serverCh:
numServers = s
c.updateMaxSessions(numServers, numProxies)
case <-ws.WatchCh(ctx):
ws, numProxies, err = c.countProxies(ctx)
if err != nil {
return
}
c.updateDrainRateLimit(numProxies)
c.updateMaxSessions(numServers, numProxies)
case <-ctx.Done():
return
}
}
}
// SetServerCount updates the number of healthy servers that is used when
// determining capacity. It is called by the autopilot delegate.
func (c *Controller) SetServerCount(count uint32) {
select {
case c.serverCh <- count:
case <-c.doneCh:
}
}
func (c *Controller) updateDrainRateLimit(numProxies uint32) {
rateLimit := calcRateLimit(numProxies)
if rateLimit == c.prevRateLimit {
return
}
c.cfg.Logger.Debug("updating drain rate limit", "rate_limit", rateLimit)
c.cfg.SessionLimiter.SetDrainRateLimit(rateLimit)
c.prevRateLimit = rateLimit
}
// We dynamically scale the rate at which excess sessions will be drained
// according to the number of proxies in the catalog.
//
// The numbers here are pretty arbitrary (change them if you find better ones!)
// but the logic is:
//
// 0-512 proxies: drain 1 per second
// 513-2815 proxies: linearly scaled by 1/s for every additional 256 proxies
// 2816+ proxies: drain 10 per second
//
func calcRateLimit(numProxies uint32) rate.Limit {
perSecond := math.Floor((float64(numProxies) - 256) / 256)
if perSecond < 1 {
return 1
}
if perSecond > 10 {
return 10
}
return rate.Limit(perSecond)
}
func (c *Controller) updateMaxSessions(numServers, numProxies uint32) {
if numServers == 0 || numProxies == 0 {
return
}
maxSessions := uint32(math.Ceil((float64(numProxies) / float64(numServers)) * (1 + errorMargin)))
if maxSessions == c.prevMaxSessions {
return
}
c.cfg.Logger.Debug(
"updating max sessions",
"max_sessions", maxSessions,
"num_servers", numServers,
"num_proxies", numProxies,
)
metrics.SetGauge([]string{"xds", "server", "idealStreamsMax"}, float32(maxSessions))
c.cfg.SessionLimiter.SetMaxSessions(maxSessions)
c.prevMaxSessions = maxSessions
}
// countProxies counts the number of registered proxy services, retrying on
// error until the given context is cancelled.
func (c *Controller) countProxies(ctx context.Context) (memdb.WatchSet, uint32, error) {
retryWaiter := &retry.Waiter{
MinFailures: 1,
MinWait: 1 * time.Second,
MaxWait: 1 * time.Minute,
}
for {
store := c.cfg.GetStore()
ws := memdb.NewWatchSet()
ws.Add(store.AbandonCh())
var count uint32
_, usage, err := store.ServiceUsage(ws)
// Query failed? Wait for a while, and then go to the top of the loop to
// retry (unless the context is cancelled).
if err != nil {
if err := retryWaiter.Wait(ctx); err != nil {
return nil, 0, err
}
continue
}
for kind, kindCount := range usage.ConnectServiceInstances {
if structs.ServiceKind(kind).IsProxy() {
count += uint32(kindCount)
}
}
return ws, count, nil
}
}
type Store interface {
AbandonCh() <-chan struct{}
ServiceUsage(ws memdb.WatchSet) (uint64, state.ServiceUsage, error)
}

View File

@ -0,0 +1,136 @@
package xdscapacity
import (
"context"
"fmt"
"testing"
"time"
"github.com/armon/go-metrics"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
)
func TestController(t *testing.T) {
const index = 123
store := state.NewStateStore(nil)
// This loop generates:
//
// 4 (service kind) * 5 (service) * 5 * (node) = 100 proxy services. And 25 non-proxy services.
for _, kind := range []structs.ServiceKind{
// These will be included in the count.
structs.ServiceKindConnectProxy,
structs.ServiceKindIngressGateway,
structs.ServiceKindTerminatingGateway,
structs.ServiceKindMeshGateway,
// This one will not.
structs.ServiceKindTypical,
} {
for i := 0; i < 5; i++ {
serviceName := fmt.Sprintf("%s-%d", kind, i)
for j := 0; j < 5; j++ {
nodeName := fmt.Sprintf("%s-node-%d", serviceName, j)
require.NoError(t, store.EnsureRegistration(index, &structs.RegisterRequest{
Node: nodeName,
Service: &structs.NodeService{
ID: serviceName,
Service: serviceName,
Kind: kind,
},
}))
}
}
}
limiter := newTestLimiter()
sink := metrics.NewInmemSink(1*time.Minute, 1*time.Minute)
cfg := metrics.DefaultConfig("consul")
cfg.EnableHostname = false
metrics.NewGlobal(cfg, sink)
t.Cleanup(func() {
sink := &metrics.BlackholeSink{}
metrics.NewGlobal(cfg, sink)
})
adj := NewController(Config{
Logger: testutil.Logger(t),
GetStore: func() Store { return store },
SessionLimiter: limiter,
})
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
go adj.Run(ctx)
// Keen readers will notice the numbers here are off by one. This is due to
// floating point math (because we multiply by 1.1).
testutil.RunStep(t, "load split between 2 servers", func(t *testing.T) {
adj.SetServerCount(2)
require.Equal(t, 56, limiter.receive(t))
})
testutil.RunStep(t, "all load on 1 server", func(t *testing.T) {
adj.SetServerCount(1)
require.Equal(t, 111, limiter.receive(t))
})
testutil.RunStep(t, "delete proxy service", func(t *testing.T) {
require.NoError(t, store.DeleteService(index+1, "ingress-gateway-0-node-0", "ingress-gateway-0", acl.DefaultEnterpriseMeta(), structs.DefaultPeerKeyword))
require.Equal(t, 109, limiter.receive(t))
})
testutil.RunStep(t, "check we're emitting gauge", func(t *testing.T) {
data := sink.Data()
require.Len(t, data, 1)
gauge, ok := data[0].Gauges["consul.xds.server.idealStreamsMax"]
require.True(t, ok)
require.Equal(t, float32(109), gauge.Value)
})
}
func newTestLimiter() *testLimiter {
return &testLimiter{ch: make(chan uint32, 1)}
}
type testLimiter struct{ ch chan uint32 }
func (tl *testLimiter) SetMaxSessions(max uint32) { tl.ch <- max }
func (tl *testLimiter) receive(t *testing.T) int {
select {
case v := <-tl.ch:
return int(v)
case <-time.After(1 * time.Second):
t.Fatal("timeout waiting for SetMaxSessions")
}
panic("this should never be reached")
}
func (tl *testLimiter) SetDrainRateLimit(rateLimit rate.Limit) {}
func TestCalcRateLimit(t *testing.T) {
for in, out := range map[uint32]rate.Limit{
0: rate.Limit(1),
1: rate.Limit(1),
512: rate.Limit(1),
768: rate.Limit(2),
1024: rate.Limit(3),
2816: rate.Limit(10),
1000000000: rate.Limit(10),
} {
require.Equalf(t, out, calcRateLimit(in), "calcRateLimit(%d)", in)
}
}

View File

@ -0,0 +1,245 @@
// package limiter provides primatives for limiting the number of concurrent
// operations in-flight.
package limiter
import (
"context"
"errors"
"math/rand"
"sort"
"sync"
"sync/atomic"
"golang.org/x/time/rate"
)
// Unlimited can be used to allow an unlimited number of concurrent sessions.
const Unlimited uint32 = 0
// ErrCapacityReached is returned when there is no capacity for additional sessions.
var ErrCapacityReached = errors.New("active session limit reached")
// SessionLimiter is a session-based concurrency limiter, it provides the basis
// of gRPC/xDS load balancing.
//
// Stream handlers obtain a session with BeginSession before they begin serving
// resources - if the server has reached capacity ErrCapacityReached is returned,
// otherwise a Session is returned.
//
// It is the session-holder's responsibility to:
//
// 1. Call End on the session when finished.
// 2. Receive on the session's Terminated channel and exit (e.g. close the gRPC
// stream) when it is closed.
//
// The maximum number of concurrent sessions is controlled with SetMaxSessions.
// If there are more than the given maximum sessions already in-flight,
// SessionLimiter will drain randomly-selected sessions at a rate controlled
// by SetDrainRateLimit.
type SessionLimiter struct {
drainLimiter *rate.Limiter
// max and inFlight are read/written using atomic operations.
max, inFlight uint32
// wakeCh is used to trigger the Run loop to start draining excess sessions.
wakeCh chan struct{}
// Everything below here is guarded by mu.
mu sync.Mutex
maxSessionID uint64
sessionIDs []uint64 // sessionIDs must be sorted so we can binary search it.
sessions map[uint64]*session
}
// NewSessionLimiter creates a new SessionLimiter.
func NewSessionLimiter() *SessionLimiter {
return &SessionLimiter{
drainLimiter: rate.NewLimiter(rate.Inf, 1),
max: Unlimited,
wakeCh: make(chan struct{}, 1),
sessionIDs: make([]uint64, 0),
sessions: make(map[uint64]*session),
}
}
// Run the SessionLimiter's drain loop, which terminates excess sessions if the
// limit is lowered. It will exit when the given context is canceled or reaches
// its deadline.
func (l *SessionLimiter) Run(ctx context.Context) {
for {
select {
case <-l.wakeCh:
for {
if !l.overCapacity() {
break
}
if err := l.drainLimiter.Wait(ctx); err != nil {
break
}
if !l.overCapacity() {
break
}
l.terminateSession()
}
case <-ctx.Done():
return
}
}
}
// SetMaxSessions controls the maximum number of concurrent sessions. If it is
// lower, randomly-selected sessions will be drained.
func (l *SessionLimiter) SetMaxSessions(max uint32) {
atomic.StoreUint32(&l.max, max)
// Send on wakeCh without blocking if the Run loop is busy. wakeCh has a
// buffer of 1, so no triggers will be missed.
select {
case l.wakeCh <- struct{}{}:
default:
}
}
// SetDrainRateLimit controls the rate at which excess sessions will be drained.
func (l *SessionLimiter) SetDrainRateLimit(limit rate.Limit) {
l.drainLimiter.SetLimit(limit)
}
// BeginSession begins a new session, or returns ErrCapacityReached if the
// concurrent session limit has been reached.
//
// It is the session-holder's responsibility to:
//
// 1. Call End on the session when finished.
// 2. Receive on the session's Terminated channel and exit (e.g. close the gRPC
// stream) when it is closed.
func (l *SessionLimiter) BeginSession() (Session, error) {
if !l.hasCapacity() {
return nil, ErrCapacityReached
}
l.mu.Lock()
defer l.mu.Unlock()
return l.createSessionLocked(), nil
}
// Note: hasCapacity is *best effort*. As we do not hold l.mu it's possible that:
//
// - max has changed by the time we compare it to inFlight.
// - inFlight < max now, but increases before we create a new session.
//
// This is acceptable for our uses, especially because excess sessions will
// eventually be drained.
func (l *SessionLimiter) hasCapacity() bool {
max := atomic.LoadUint32(&l.max)
if max == Unlimited {
return true
}
cur := atomic.LoadUint32(&l.inFlight)
return max > cur
}
// Note: overCapacity is *best effort*. As we do not hold l.mu it's possible that:
//
// - max has changed by the time we compare it to inFlight.
// - inFlight > max now, but decreases before we terminate a session.
func (l *SessionLimiter) overCapacity() bool {
max := atomic.LoadUint32(&l.max)
if max == Unlimited {
return false
}
cur := atomic.LoadUint32(&l.inFlight)
return cur > max
}
func (l *SessionLimiter) terminateSession() {
l.mu.Lock()
defer l.mu.Unlock()
idx := rand.Intn(len(l.sessionIDs))
id := l.sessionIDs[idx]
l.sessions[id].terminate()
l.deleteSessionLocked(idx, id)
}
func (l *SessionLimiter) createSessionLocked() *session {
session := &session{
l: l,
id: l.maxSessionID,
termCh: make(chan struct{}),
}
l.maxSessionID++
l.sessionIDs = append(l.sessionIDs, session.id)
l.sessions[session.id] = session
atomic.AddUint32(&l.inFlight, 1)
return session
}
func (l *SessionLimiter) deleteSessionLocked(idx int, id uint64) {
delete(l.sessions, id)
// Note: it's important that we preserve the order here (which most allocation
// free deletion tricks don't) because we binary search the slice.
l.sessionIDs = append(l.sessionIDs[:idx], l.sessionIDs[idx+1:]...)
atomic.AddUint32(&l.inFlight, ^uint32(0))
}
func (l *SessionLimiter) deleteSessionWithID(id uint64) {
l.mu.Lock()
defer l.mu.Unlock()
idx := sort.Search(len(l.sessionIDs), func(i int) bool {
return l.sessionIDs[i] >= id
})
if idx == len(l.sessionIDs) || l.sessionIDs[idx] != id {
// It's possible that we weren't able to find the id because the session has
// already been deleted. This could be because the session-holder called End
// more than once, or because the session was drained. In either case there's
// nothing more to do.
return
}
l.deleteSessionLocked(idx, id)
}
// Session allows its holder to perform an operation (e.g. serve a gRPC stream)
// concurrenly with other session-holders. Sessions may be terminated abruptly
// by the SessionLimiter, so it is the responsibility of the holder to receive
// on the Terminated channel and halt the operation when it is closed.
type Session interface {
// End the session.
//
// This MUST be called when the session-holder is done (e.g. the gRPC stream
// is closed).
End()
// Terminated is a channel that is closed when the session is terminated.
//
// The session-holder MUST receive on it and exit (e.g. close the gRPC stream)
// when it is closed.
Terminated() <-chan struct{}
}
type session struct {
l *SessionLimiter
id uint64
termCh chan struct{}
}
func (s *session) End() { s.l.deleteSessionWithID(s.id) }
func (s *session) Terminated() <-chan struct{} { return s.termCh }
func (s *session) terminate() { close(s.termCh) }

View File

@ -0,0 +1,81 @@
package limiter
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/lib"
)
func init() { lib.SeedMathRand() }
func TestSessionLimiter(t *testing.T) {
lim := NewSessionLimiter()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
go lim.Run(ctx)
// doneCh is used to shut the goroutines down at the end of the test.
doneCh := make(chan struct{})
t.Cleanup(func() { close(doneCh) })
// Start 10 sessions, and increment the counter when they are terminated.
var (
terminations uint32
wg sync.WaitGroup
)
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
sess, err := lim.BeginSession()
require.NoError(t, err)
defer sess.End()
wg.Done()
select {
case <-sess.Terminated():
atomic.AddUint32(&terminations, 1)
case <-doneCh:
}
}()
}
// Wait for all the sessions to begin.
wg.Wait()
// Lowering max sessions to 5 should result in 5 sessions being terminated.
lim.SetMaxSessions(5)
require.Eventually(t, func() bool {
return atomic.LoadUint32(&terminations) == 5
}, 2*time.Second, 50*time.Millisecond)
// Attempting to start a new session should fail immediately.
_, err := lim.BeginSession()
require.Equal(t, ErrCapacityReached, err)
// Raising MaxSessions should make room for a new session.
lim.SetMaxSessions(6)
sess, err := lim.BeginSession()
require.NoError(t, err)
// ...but trying to start another new one should fail
_, err = lim.BeginSession()
require.Equal(t, ErrCapacityReached, err)
// ...until another session ends.
sess.End()
_, err = lim.BeginSession()
require.NoError(t, err)
// Calling End twice is a no-op.
sess.End()
_, err = lim.BeginSession()
require.Equal(t, ErrCapacityReached, err)
}

View File

@ -23,6 +23,7 @@ import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
grpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
"github.com/hashicorp/consul/agent/pool"
@ -1446,6 +1447,7 @@ func newDefaultDeps(t *testing.T, c *consul.Config) consul.Deps {
EnterpriseDeps: newDefaultDepsEnterprise(t, logger, c),
NewRequestRecorderFunc: middleware.NewRequestRecorder,
GetNetRPCInterceptorFunc: middleware.GetNetRPCInterceptor,
XDSStreamLimiter: limiter.NewSessionLimiter(),
}
}

View File

@ -18,6 +18,8 @@ import (
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/consul/xdscapacity"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
grpc "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/agent/grpc-internal/resolver"
"github.com/hashicorp/consul/agent/local"
@ -150,6 +152,8 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
d.EventPublisher = stream.NewEventPublisher(10 * time.Second)
d.XDSStreamLimiter = limiter.NewSessionLimiter()
return d, nil
}
@ -232,7 +236,9 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau
gauges = append(gauges,
consul.AutopilotGauges,
consul.LeaderCertExpirationGauges,
consul.LeaderPeeringMetrics)
consul.LeaderPeeringMetrics,
xdscapacity.StatsGauges,
)
}
// Flatten definitions
@ -275,6 +281,7 @@ func getPrometheusDefs(cfg lib.TelemetryConfig, isServer bool) ([]prometheus.Gau
consul.RPCCounters,
grpc.StatsCounters,
local.StateCounters,
xds.StatsCounters,
raftCounters,
}
// Flatten definitions

View File

@ -8,6 +8,7 @@ import (
"sync/atomic"
"time"
"github.com/armon/go-metrics"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
@ -29,6 +30,8 @@ import (
"github.com/hashicorp/consul/logging"
)
var errOverwhelmed = status.Error(codes.ResourceExhausted, "this server has too many xDS streams open, please try another")
type deltaRecvResponse int
const (
@ -86,6 +89,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
return err
}
session, err := s.SessionLimiter.BeginSession()
if err != nil {
return errOverwhelmed
}
defer session.End()
// Loop state
var (
cfgSnap *proxycfg.ConfigSnapshot
@ -159,6 +168,10 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
for {
select {
case <-session.Terminated():
generator.Logger.Debug("draining stream to rebalance load")
metrics.IncrCounter([]string{"xds", "server", "streamDrained"}, 1)
return errOverwhelmed
case <-authTimer:
// It's been too long since a Discovery{Request,Response} so recheck ACLs.
if err := checkStreamACLs(cfgSnap); err != nil {

View File

@ -17,6 +17,7 @@ import (
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/xdscommon"
@ -36,7 +37,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP(t *testing.T) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, serverlessPluginEnabled)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, serverlessPluginEnabled, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -238,7 +239,7 @@ func TestServer_DeltaAggregatedResources_v3_NackLoop(t *testing.T) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -369,7 +370,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2(t *testing.T) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -522,7 +523,7 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
server, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy
// This mutateFn causes any endpoint with a name containing "geo-cache" to be
@ -663,7 +664,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpa
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -799,7 +800,7 @@ func TestServer_DeltaAggregatedResources_v3_BasicProtocol_HTTP2_RDS_listenerChan
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -1059,7 +1060,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLEnforcement(t *testing.T) {
return acl.NewPolicyAuthorizerWithDefaults(acl.RootAuthorizer("deny"), []*acl.Policy{policy}, nil)
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, false)
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", tt.token, 0, false, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
@ -1137,6 +1138,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedDuri
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token,
100*time.Millisecond, // Make this short.
false,
nil,
)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
@ -1236,6 +1238,7 @@ func TestServer_DeltaAggregatedResources_v3_ACLTokenDeleted_StreamTerminatedInBa
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", token,
100*time.Millisecond, // Make this short.
false,
nil,
)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
@ -1316,7 +1319,7 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, false)
scenario := newTestServerDeltaScenario(t, aclResolve, "ingress-gateway", "", 0, false, nil)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("ingress-gateway", nil)
@ -1368,6 +1371,115 @@ func TestServer_DeltaAggregatedResources_v3_IngressEmptyResponse(t *testing.T) {
}
}
func TestServer_DeltaAggregatedResources_v3_CapacityReached(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, capacityReachedLimiter{})
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
mgr.RegisterProxy(t, sid)
snap := newTestSnapshot(t, nil, "")
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
InitialResourceVersions: mustMakeVersionMap(t,
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
select {
case err := <-errCh:
require.Error(t, err)
require.Equal(t, codes.ResourceExhausted.String(), status.Code(err).String())
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}
type capacityReachedLimiter struct{}
func (capacityReachedLimiter) BeginSession() (limiter.Session, error) {
return nil, limiter.ErrCapacityReached
}
func TestServer_DeltaAggregatedResources_v3_StreamDrained(t *testing.T) {
limiter := &testLimiter{}
aclResolve := func(id string) (acl.Authorizer, error) { return acl.ManageAll(), nil }
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0, false, limiter)
mgr, errCh, envoy := scenario.mgr, scenario.errCh, scenario.envoy
sid := structs.NewServiceID("web-sidecar-proxy", nil)
mgr.RegisterProxy(t, sid)
testutil.RunStep(t, "successful request/response", func(t *testing.T) {
snap := newTestSnapshot(t, nil, "")
envoy.SendDeltaReq(t, xdscommon.ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
InitialResourceVersions: mustMakeVersionMap(t,
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
mgr.DeliverConfig(t, sid, snap)
assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: xdscommon.ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
),
})
})
testutil.RunStep(t, "terminate limiter session", func(t *testing.T) {
limiter.TerminateSession()
select {
case err := <-errCh:
require.Error(t, err)
require.Equal(t, codes.ResourceExhausted.String(), status.Code(err).String())
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
})
testutil.RunStep(t, "check drain counter incremeted", func(t *testing.T) {
data := scenario.sink.Data()
require.Len(t, data, 1)
item := data[0]
require.Len(t, item.Counters, 1)
val, ok := item.Counters["consul.xds.test.xds.server.streamDrained"]
require.True(t, ok)
require.Equal(t, 1, val.Count)
})
}
type testLimiter struct {
termCh chan struct{}
}
func (t *testLimiter) BeginSession() (limiter.Session, error) {
t.termCh = make(chan struct{})
return &testSession{termCh: t.termCh}, nil
}
func (t *testLimiter) TerminateSession() { close(t.termCh) }
type testSession struct {
termCh chan struct{}
}
func (t *testSession) Terminated() <-chan struct{} { return t.termCh }
func (*testSession) End() {}
func assertDeltaChanBlocked(t *testing.T, ch chan *envoy_discovery_v3.DeltaDiscoveryResponse) {
t.Helper()
select {

View File

@ -17,6 +17,7 @@ import (
"github.com/hashicorp/consul/acl"
external "github.com/hashicorp/consul/agent/grpc-external"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/xds/xdscommon"
@ -29,6 +30,13 @@ var StatsGauges = []prometheus.GaugeDefinition{
},
}
var StatsCounters = []prometheus.CounterDefinition{
{
Name: []string{"xds", "server", "streamDrained"},
Help: "Counts the number of xDS streams that are drained when rebalancing the load between servers.",
},
}
// ADSStream is a shorter way of referring to this thing...
type ADSStream = envoy_discovery_v3.AggregatedDiscoveryService_StreamAggregatedResourcesServer
@ -97,6 +105,12 @@ type ProxyConfigSource interface {
Watch(id structs.ServiceID, nodeName string, token string) (<-chan *proxycfg.ConfigSnapshot, proxycfg.CancelFunc, error)
}
// SessionLimiter is the interface exposed by limiter.SessionLimiter. We depend
// on an interface rather than the concrete type so we can mock it in tests.
type SessionLimiter interface {
BeginSession() (limiter.Session, error)
}
// Server represents a gRPC server that can handle xDS requests from Envoy. All
// of it's public members must be set before the gRPC server is started.
//
@ -108,6 +122,7 @@ type Server struct {
CfgSrc ProxyConfigSource
ResolveToken ACLResolverFunc
CfgFetcher ConfigFetcher
SessionLimiter SessionLimiter
// AuthCheckFrequency is how often we should re-check the credentials used
// during a long-lived gRPC Stream after it has been initially established.
@ -159,6 +174,7 @@ func NewServer(
cfgMgr ProxyConfigSource,
resolveToken ACLResolverFunc,
cfgFetcher ConfigFetcher,
limiter SessionLimiter,
) *Server {
return &Server{
NodeName: nodeName,
@ -166,6 +182,7 @@ func NewServer(
CfgSrc: cfgMgr,
ResolveToken: resolveToken,
CfgFetcher: cfgFetcher,
SessionLimiter: limiter,
AuthCheckFrequency: DefaultAuthCheckFrequency,
activeStreams: &activeStreamCounters{},
serverlessPluginEnabled: serverlessPluginEnabled,

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/grpc-external/limiter"
envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
@ -136,6 +137,7 @@ func newTestServerDeltaScenario(
token string,
authCheckFrequency time.Duration,
serverlessPluginEnabled bool,
sessionLimiter SessionLimiter,
) *testServerScenario {
mgr := newTestManager(t)
envoy := NewTestEnvoy(t, proxyID, token)
@ -154,6 +156,10 @@ func newTestServerDeltaScenario(
metrics.NewGlobal(cfg, sink)
})
if sessionLimiter == nil {
sessionLimiter = limiter.NewSessionLimiter()
}
s := NewServer(
"node-123",
testutil.Logger(t),
@ -161,6 +167,7 @@ func newTestServerDeltaScenario(
mgr,
resolveToken,
nil, /*cfgFetcher ConfigFetcher*/
sessionLimiter,
)
if authCheckFrequency > 0 {
s.AuthCheckFrequency = authCheckFrequency

View File

@ -61,6 +61,7 @@ const (
WAN string = "wan"
Watch string = "watch"
XDS string = "xds"
XDSCapacityController string = "xds_capacity_controller"
Vault string = "vault"
Health string = "health"
)

View File

@ -542,6 +542,8 @@ These metrics are used to monitor the health of the Consul servers.
| `consul.grpc.server.stream.count` | Counts the number of new gRPC streams received by the server. | streams | counter |
| `consul.grpc.server.streams` | Measures the number of active gRPC streams handled by the server. | streams | gauge |
| `consul.xds.server.streams` | Measures the number of active xDS streams handled by the server split by protocol version. | streams | gauge |
| `consul.xds.server.idealStreamsMax` | The maximum number of xDS streams per server, chosen to achieve a roughly even spread of load across servers. | streams | gauge |
| `consul.xds.server.streamDrained` | Counts the number of xDS streams that are drained when rebalancing the load between servers. | streams | counter |
## Server Workload