Apply the limits to the clients rpcLimiter

This commit is contained in:
Matt Keeler 2018-06-11 15:51:17 -04:00
parent 672a2a3577
commit 65746b2f8f
3 changed files with 33 additions and 7 deletions

View File

@ -73,6 +73,7 @@ type delegate interface {
SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io.Writer, replyFn structs.SnapshotReplyFn) error
Shutdown() error Shutdown() error
Stats() map[string]map[string]string Stats() map[string]map[string]string
ReloadConfig(config *consul.Config) error
} }
// notifier is called after a successful JoinLAN. // notifier is called after a successful JoinLAN.
@ -2480,8 +2481,8 @@ func (a *Agent) DisableNodeMaintenance() {
} }
func (a *Agent) loadLimits(conf *config.RuntimeConfig) { func (a *Agent) loadLimits(conf *config.RuntimeConfig) {
a.config.RPCRateLimit = conf.RPCRateLimit a.config.RPCRateLimit = conf.RPCRateLimit
a.config.RPCMaxBurst = conf.RPCMaxBurst a.config.RPCMaxBurst = conf.RPCMaxBurst
} }
func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error { func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
@ -2518,7 +2519,17 @@ func (a *Agent) ReloadConfig(newCfg *config.RuntimeConfig) error {
return fmt.Errorf("Failed reloading watches: %v", err) return fmt.Errorf("Failed reloading watches: %v", err)
} }
a.loadLimits(newCfg) a.loadLimits(newCfg)
// create the config for the rpc server/client
consulCfg, err := a.consulConfig()
if err != nil {
return err
}
if err := a.delegate.ReloadConfig(consulCfg); err != nil {
return err
}
// Update filtered metrics // Update filtered metrics
metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes) metrics.UpdateFilter(newCfg.TelemetryAllowedPrefixes, newCfg.TelemetryBlockedPrefixes)

View File

@ -7,6 +7,7 @@ import (
"os" "os"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
@ -56,7 +57,7 @@ type Client struct {
// rpcLimiter is used to rate limit the total number of RPCs initiated // rpcLimiter is used to rate limit the total number of RPCs initiated
// from an agent. // from an agent.
rpcLimiter *rate.Limiter rpcLimiter atomic.Value
// eventCh is used to receive events from the // eventCh is used to receive events from the
// serf cluster in the datacenter // serf cluster in the datacenter
@ -125,12 +126,13 @@ func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) {
c := &Client{ c := &Client{
config: config, config: config,
connPool: connPool, connPool: connPool,
rpcLimiter: rate.NewLimiter(config.RPCRate, config.RPCMaxBurst),
eventCh: make(chan serf.Event, serfEventBacklog), eventCh: make(chan serf.Event, serfEventBacklog),
logger: logger, logger: logger,
shutdownCh: make(chan struct{}), shutdownCh: make(chan struct{}),
} }
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
// Initialize the LAN Serf // Initialize the LAN Serf
c.serf, err = c.setupSerf(config.SerfLANConfig, c.serf, err = c.setupSerf(config.SerfLANConfig,
c.eventCh, serfLANSnapshot) c.eventCh, serfLANSnapshot)
@ -251,7 +253,7 @@ TRY:
// Enforce the RPC limit. // Enforce the RPC limit.
metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1) metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1)
metrics.IncrCounter([]string{"client", "rpc"}, 1) metrics.IncrCounter([]string{"client", "rpc"}, 1)
if !c.rpcLimiter.Allow() { if !c.rpcLimiter.Load().(*rate.Limiter).Allow() {
metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1) metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1)
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1) metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
return structs.ErrRPCRateExceeded return structs.ErrRPCRateExceeded
@ -295,7 +297,7 @@ func (c *Client) SnapshotRPC(args *structs.SnapshotRequest, in io.Reader, out io
// Enforce the RPC limit. // Enforce the RPC limit.
metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1) metrics.IncrCounter([]string{"consul", "client", "rpc"}, 1)
metrics.IncrCounter([]string{"client", "rpc"}, 1) metrics.IncrCounter([]string{"client", "rpc"}, 1)
if !c.rpcLimiter.Allow() { if !c.rpcLimiter.Load().(*rate.Limiter).Allow() {
metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1) metrics.IncrCounter([]string{"consul", "client", "rpc", "exceeded"}, 1)
metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1) metrics.IncrCounter([]string{"client", "rpc", "exceeded"}, 1)
return structs.ErrRPCRateExceeded return structs.ErrRPCRateExceeded
@ -360,3 +362,10 @@ func (c *Client) GetLANCoordinate() (lib.CoordinateSet, error) {
cs := lib.CoordinateSet{c.config.Segment: lan} cs := lib.CoordinateSet{c.config.Segment: lan}
return cs, nil return cs, nil
} }
// ReloadConfig is used to have the Client do an online reload of
// relevant configuration information
func (c *Client) ReloadConfig(config *Config) error {
c.rpcLimiter.Store(rate.NewLimiter(config.RPCRate, config.RPCMaxBurst))
return nil
}

View File

@ -1040,6 +1040,12 @@ func (s *Server) GetLANCoordinate() (lib.CoordinateSet, error) {
return cs, nil return cs, nil
} }
// ReloadConfig is used to have the Server do an online reload of
// relevant configuration information
func (s *Server) ReloadConfig(config *Config) error {
return nil
}
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write // Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
func (s *Server) setConsistentReadReady() { func (s *Server) setConsistentReadReady() {
atomic.StoreInt32(&s.readyForConsistentReads, 1) atomic.StoreInt32(&s.readyForConsistentReads, 1)